Skip to content

Commit

Permalink
feat: database adds seed operation to compatible old version
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Mar 31, 2023
1 parent 8b23d60 commit fea0ad6
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 134 deletions.
26 changes: 26 additions & 0 deletions manager/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package database

import (
"errors"
"fmt"

"github.com/go-redis/redis/v8"
Expand All @@ -25,6 +26,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
schedulerconfig "d7y.io/dragonfly/v2/scheduler/config"
)

Expand Down Expand Up @@ -93,6 +95,7 @@ func migrate(db *gorm.DB) error {
}

func seed(cfg *config.Config, db *gorm.DB) error {
// Create default scheduler cluster.
var schedulerClusterCount int64
if err := db.Model(models.SchedulerCluster{}).Count(&schedulerClusterCount).Error; err != nil {
return err
Expand All @@ -119,6 +122,7 @@ func seed(cfg *config.Config, db *gorm.DB) error {
}
}

// Create default seed peer cluster.
var seedPeerClusterCount int64
if err := db.Model(models.SeedPeerCluster{}).Count(&seedPeerClusterCount).Error; err != nil {
return err
Expand Down Expand Up @@ -153,5 +157,27 @@ func seed(cfg *config.Config, db *gorm.DB) error {
}
}

// TODO Compatible with old version.
// Update scheduler features when features is NULL.
var schedulers []models.Scheduler
if err := db.Model(models.Scheduler{}).Find(&schedulers).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}

return err
}

for _, scheduler := range schedulers {
if scheduler.Features == nil {
if err := db.Model(&scheduler).Update("features", models.Array(types.DefaultSchedulerFeatures)).Error; err != nil {
logger.Errorf("update scheduler %d features: %s", scheduler.ID, err.Error())
continue
}

logger.Infof("update scheduler %d default features", scheduler.ID)
}
}

return nil
}
69 changes: 34 additions & 35 deletions manager/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ func Paginate(page, perPage int) func(db *gorm.DB) *gorm.DB {
}
}

type (
JSONMap map[string]any
Array []string
)
type JSONMap map[string]any

func (m JSONMap) Value() (driver.Value, error) {
if m == nil {
Expand All @@ -55,14 +52,6 @@ func (m JSONMap) Value() (driver.Value, error) {
return string(ba), err
}

func (a Array) Value() (driver.Value, error) {
if a == nil {
return nil, nil
}
ba, err := a.MarshalJSON()
return string(ba), err
}

func (m *JSONMap) Scan(val any) error {
var ba []byte
switch v := val.(type) {
Expand All @@ -79,6 +68,39 @@ func (m *JSONMap) Scan(val any) error {
return err
}

func (m JSONMap) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
t := (map[string]any)(m)
return json.Marshal(t)
}

func (m *JSONMap) UnmarshalJSON(b []byte) error {
t := map[string]any{}
err := json.Unmarshal(b, &t)
*m = JSONMap(t)
return err
}

func (m JSONMap) GormDataType() string {
return "jsonmap"
}

func (JSONMap) GormDBDataType(db *gorm.DB, field *schema.Field) string {
return "text"
}

type Array []string

func (a Array) Value() (driver.Value, error) {
if a == nil {
return nil, nil
}
ba, err := a.MarshalJSON()
return string(ba), err
}

func (a *Array) Scan(val any) error {
var ba []byte
switch v := val.(type) {
Expand All @@ -95,14 +117,6 @@ func (a *Array) Scan(val any) error {
return err
}

func (m JSONMap) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
t := (map[string]any)(m)
return json.Marshal(t)
}

func (a Array) MarshalJSON() ([]byte, error) {
if a == nil {
return []byte("null"), nil
Expand All @@ -111,28 +125,13 @@ func (a Array) MarshalJSON() ([]byte, error) {
return json.Marshal(t)
}

func (m *JSONMap) UnmarshalJSON(b []byte) error {
t := map[string]any{}
err := json.Unmarshal(b, &t)
*m = JSONMap(t)
return err
}

func (a *Array) UnmarshalJSON(b []byte) error {
t := []string{}
err := json.Unmarshal(b, &t)
*a = Array(t)
return err
}

func (m JSONMap) GormDataType() string {
return "jsonmap"
}

func (JSONMap) GormDBDataType(db *gorm.DB, field *schema.Field) string {
return "text"
}

func (Array) GormDataType() string {
return "array"
}
Expand Down
2 changes: 1 addition & 1 deletion manager/models/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Scheduler struct {
IP string `gorm:"column:ip;type:varchar(256);not null;comment:ip address" json:"ip"`
Port int32 `gorm:"column:port;not null;comment:grpc service listening port" json:"port"`
State string `gorm:"column:state;type:varchar(256);default:'inactive';comment:service state" json:"state"`
Features JSONMap `gorm:"column:features;comment:feature flags" json:"features"`
Features Array `gorm:"column:features;comment:feature flags" json:"features"`
SchedulerClusterID uint `gorm:"index:uk_scheduler,unique;not null;comment:scheduler cluster id"`
SchedulerCluster SchedulerCluster `json:"-"`
Models []Model `json:"-"`
Expand Down
50 changes: 38 additions & 12 deletions manager/rpcserver/manager_server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"d7y.io/dragonfly/v2/manager/types"
pkgcache "d7y.io/dragonfly/v2/pkg/cache"
"d7y.io/dragonfly/v2/pkg/objectstorage"
"d7y.io/dragonfly/v2/pkg/structure"
"d7y.io/dragonfly/v2/pkg/slices"
)

// managerServerV1 is v1 version of the manager grpc server.
Expand Down Expand Up @@ -125,6 +125,12 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee
var pbSchedulers []*managerv1.Scheduler
for _, schedulerCluster := range seedPeer.SeedPeerCluster.SchedulerClusters {
for _, scheduler := range schedulerCluster.Schedulers {
// Marshal features of scheduler.
features, err := scheduler.Features.MarshalJSON()
if err != nil {
return nil, status.Error(codes.DataLoss, err.Error())
}

pbSchedulers = append(pbSchedulers, &managerv1.Scheduler{
Id: uint64(scheduler.ID),
Hostname: scheduler.Hostname,
Expand All @@ -133,6 +139,7 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee
Ip: scheduler.IP,
Port: scheduler.Port,
State: scheduler.State,
Features: features,
})
}
}
Expand Down Expand Up @@ -416,32 +423,33 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up
Ip: scheduler.IP,
Port: scheduler.Port,
Features: features,
SchedulerClusterId: uint64(scheduler.SchedulerClusterID),
State: scheduler.State,
SchedulerClusterId: uint64(scheduler.SchedulerClusterID),
}, nil
}

// Create scheduler and associate cluster.
func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.UpdateSchedulerRequest) (*managerv1.Scheduler, error) {
features, err := structure.StructToMap(types.DefaultSchedulerFeatures)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

scheduler := models.Scheduler{
Hostname: req.Hostname,
IDC: req.Idc,
Location: req.Location,
IP: req.Ip,
Port: req.Port,
Features: features,
Features: types.DefaultSchedulerFeatures,
SchedulerClusterID: uint(req.SchedulerClusterId),
}

if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Marshal features of scheduler.
features, err := scheduler.Features.MarshalJSON()
if err != nil {
return nil, status.Error(codes.DataLoss, err.Error())
}

return &managerv1.Scheduler{
Id: uint64(scheduler.ID),
Hostname: scheduler.Hostname,
Expand All @@ -450,6 +458,7 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up
Ip: scheduler.IP,
Port: scheduler.Port,
State: scheduler.State,
Features: features,
SchedulerClusterId: uint64(scheduler.SchedulerClusterID),
}, nil
}
Expand Down Expand Up @@ -484,13 +493,30 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis
return &pbListSchedulersResponse, nil
}

// Cache miss.
// Cache miss and search scheduler cluster.
var schedulerClusters []models.SchedulerCluster
if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").
Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo)

// Remove schedulers which not have scehdule featrue. As OceanBase does not support JSON type,
// it is not possible to use datatypes.JSONQuery for filtering.
var tmpSchedulerClusters []models.SchedulerCluster
for _, schedulerCluster := range schedulerClusters {
var tmpSchedulers []models.Scheduler
for _, scheduler := range schedulerCluster.Schedulers {
if slices.Contains(scheduler.Features, types.SchedulerFeatureSchedule) {
tmpSchedulers = append(tmpSchedulers, scheduler)
}
}

if len(tmpSchedulers) != 0 {
schedulerCluster.Schedulers = tmpSchedulers
tmpSchedulerClusters = append(tmpSchedulerClusters, schedulerCluster)
}
}
log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(tmpSchedulerClusters), req.HostInfo)

// Search optimal scheduler clusters.
// If searcher can not found candidate scheduler cluster,
Expand All @@ -499,13 +525,13 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis
candidateSchedulerClusters []models.SchedulerCluster
err error
)
candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, schedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger)
candidateSchedulerClusters, err = s.searcher.FindSchedulerClusters(ctx, tmpSchedulerClusters, req.Hostname, req.Ip, req.HostInfo, logger.CoreLogger)
if err != nil {
log.Error(err)
metrics.SearchSchedulerClusterFailureCount.WithLabelValues(req.Version, req.Commit).Inc()
candidateSchedulerClusters = schedulerClusters
}
log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(schedulerClusters))
log.Debugf("find matching scheduler cluster %v", getSchedulerClusterNames(candidateSchedulerClusters))

schedulers := []models.Scheduler{}
for _, candidateSchedulerCluster := range candidateSchedulerClusters {
Expand Down
Loading

0 comments on commit fea0ad6

Please sign in to comment.