From 51c1f9bb1a6fb67cc905c3d327e82af49d179978 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 23 Aug 2023 10:27:35 +0800 Subject: [PATCH] feat: add createSyncPeers to async job in manager (#2664) Signed-off-by: Gaius --- manager/config/config.go | 3 ++ manager/job/job.go | 24 ++++++++++- manager/job/preheat.go | 6 +-- manager/job/sync_peers.go | 88 ++++++++++++++++++++++++++------------- manager/manager.go | 2 +- 5 files changed, 88 insertions(+), 35 deletions(-) diff --git a/manager/config/config.go b/manager/config/config.go index d85520fc7c6..c8a96e086ec 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -301,6 +301,9 @@ type SyncPeersConfig struct { // Interval is the interval for syncing all peers information from the scheduler and // display peers information in the manager console. Interval time.Duration `yaml:"interval" mapstructure:"interval"` + + // Timeout is the timeout for syncing peers information from the single scheduler. + Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` } type PreheatTLSClientConfig struct { diff --git a/manager/job/job.go b/manager/job/job.go index 8b19d8cb0fa..bac8c7a6e86 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -19,14 +19,19 @@ package job import ( "crypto/x509" "errors" + "time" "go.opentelemetry.io/otel" + "gorm.io/gorm" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" ) +// DefaultTaskPollingInterval is the default interval for polling task. +const DefaultTaskPollingInterval = 5 * time.Second + // tracer is a global tracer for job. var tracer = otel.Tracer("manager") @@ -38,7 +43,7 @@ type Job struct { } // New returns a new Job. -func New(cfg *config.Config) (*Job, error) { +func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { j, err := internaljob.New(&internaljob.Config{ Addrs: cfg.Database.Redis.Addrs, MasterName: cfg.Database.Redis.MasterName, @@ -64,7 +69,7 @@ func New(cfg *config.Config) (*Job, error) { return nil, err } - syncPeers, err := newSyncPeers(j) + syncPeers, err := newSyncPeers(cfg, j, gdb) if err != nil { return nil, err } @@ -76,6 +81,16 @@ func New(cfg *config.Config) (*Job, error) { }, nil } +// Serve starts the job server. +func (j *Job) Serve() { + j.SyncPeers.Serve() +} + +// Stop stops the job server. +func (j *Job) Stop() { + j.SyncPeers.Stop() +} + // getSchedulerQueues gets scheduler queues. func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue { var queues []internaljob.Queue @@ -90,3 +105,8 @@ func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue { return queues } + +// getSchedulerQueue gets scheduler queue. +func getSchedulerQueue(scheduler models.Scheduler) (internaljob.Queue, error) { + return internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname) +} diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 55e744e1ec1..e0c1b68c8b6 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -98,14 +98,14 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul filter := json.Filter rawheader := json.Headers - // Initialize queues + // Initialize queues. queues := getSchedulerQueues(schedulers) - // Generate download files + // Generate download files. var files []internaljob.PreheatRequest switch PreheatType(json.Type) { case PreheatImageType: - // Parse image manifest url + // Parse image manifest url. image, err := parseAccessURL(url) if err != nil { return nil, err diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index 5086a8d5ef5..349a045229f 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -26,6 +26,7 @@ import ( machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" + "gorm.io/gorm" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" @@ -44,61 +45,90 @@ type SyncPeers interface { // syncPeers is an implementation of SyncPeers. type syncPeers struct { - job *internaljob.Job + config *config.Config + job *internaljob.Job + db *gorm.DB + done chan struct{} } // newSyncPeers returns a new SyncPeers. -func newSyncPeers(job *internaljob.Job) (SyncPeers, error) { - return &syncPeers{job}, nil +func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) { + return &syncPeers{ + config: cfg, + db: gdb, + job: job, + done: make(chan struct{}), + }, nil } // TODO Implement function. // Started sync peers server. func (s *syncPeers) Serve() { + tick := time.NewTicker(s.config.Job.SyncPeers.Interval) + for { + select { + case <-tick.C: + // Find all of the scheduler clusters that has active schedulers. + var candidateSchedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(context.Background()).Find(&candidateSchedulerClusters).Error; err != nil { + logger.Errorf("find candidate scheduler clusters failed: %v", err) + break + } + + var candidateSchedulers []models.Scheduler + for _, candidateSchedulerCluster := range candidateSchedulerClusters { + var schedulers []models.Scheduler + if err := s.db.WithContext(context.Background()).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ + SchedulerClusterID: candidateSchedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue + } + + candidateSchedulers = append(candidateSchedulers, schedulers...) + } + + for _, scheduler := range candidateSchedulers { + if _, err := s.createSyncPeers(context.Background(), scheduler); err != nil { + logger.Error(err) + } + } + case <-s.done: + return + } + } } -// TODO Implement function. // Stop sync peers server. func (s *syncPeers) Stop() { + close(s.done) } // createSyncPeers creates sync peers. -func (s *syncPeers) createSyncPeers(ctx context.Context, schedulers []models.Scheduler) (*internaljob.GroupJobState, error) { +func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) (any, error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanSyncPeers, trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() - // Initialize queues - queues := getSchedulerQueues(schedulers) - - var signatures []*machineryv1tasks.Signature - for _, queue := range queues { - signatures = append(signatures, &machineryv1tasks.Signature{ - UUID: fmt.Sprintf("task_%s", uuid.New().String()), - Name: internaljob.SyncPeersJob, - RoutingKey: queue.String(), - }) - } - - group, err := machineryv1tasks.NewGroup(signatures...) + // Initialize queue. + queue, err := getSchedulerQueue(scheduler) if err != nil { return nil, err } - var tasks []machineryv1tasks.Signature - for _, signature := range signatures { - tasks = append(tasks, *signature) + // Initialize task signature. + task := &machineryv1tasks.Signature{ + UUID: fmt.Sprintf("task_%s", uuid.New().String()), + Name: internaljob.SyncPeersJob, + RoutingKey: queue.String(), } - logger.Infof("create sync peers group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) - if _, err := s.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { - logger.Errorf("create sync peers group %s failed", group.GroupUUID, err) + logger.Infof("create sync peers in queue %v, task: %#v", queue, task) + asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task) + if err != nil { + logger.Errorf("create sync peers in queue %v failed", queue, err) return nil, err } - return &internaljob.GroupJobState{ - GroupUUID: group.GroupUUID, - State: machineryv1tasks.StatePending, - CreatedAt: time.Now(), - }, nil + return asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval) } diff --git a/manager/manager.go b/manager/manager.go index 0f211284296..b4480a2b121 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -125,7 +125,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { searcher := searcher.New(d.PluginDir()) // Initialize job - job, err := job.New(cfg) + job, err := job.New(cfg, db.DB) if err != nil { return nil, err }