Skip to content

Commit

Permalink
feat: add createSyncPeers to async job in manager (#2664)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 23, 2023
1 parent dfde8bd commit 51c1f9b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 35 deletions.
3 changes: 3 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ type SyncPeersConfig struct {
// Interval is the interval for syncing all peers information from the scheduler and
// display peers information in the manager console.
Interval time.Duration `yaml:"interval" mapstructure:"interval"`

// Timeout is the timeout for syncing peers information from the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type PreheatTLSClientConfig struct {
Expand Down
24 changes: 22 additions & 2 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ package job
import (
"crypto/x509"
"errors"
"time"

"go.opentelemetry.io/otel"
"gorm.io/gorm"

internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
)

// DefaultTaskPollingInterval is the default interval for polling task.
const DefaultTaskPollingInterval = 5 * time.Second

// tracer is a global tracer for job.
var tracer = otel.Tracer("manager")

Expand All @@ -38,7 +43,7 @@ type Job struct {
}

// New returns a new Job.
func New(cfg *config.Config) (*Job, error) {
func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
j, err := internaljob.New(&internaljob.Config{
Addrs: cfg.Database.Redis.Addrs,
MasterName: cfg.Database.Redis.MasterName,
Expand All @@ -64,7 +69,7 @@ func New(cfg *config.Config) (*Job, error) {
return nil, err
}

syncPeers, err := newSyncPeers(j)
syncPeers, err := newSyncPeers(cfg, j, gdb)
if err != nil {
return nil, err
}
Expand All @@ -76,6 +81,16 @@ func New(cfg *config.Config) (*Job, error) {
}, nil
}

// Serve starts the job server.
func (j *Job) Serve() {
j.SyncPeers.Serve()
}

// Stop stops the job server.
func (j *Job) Stop() {
j.SyncPeers.Stop()
}

// getSchedulerQueues gets scheduler queues.
func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {
var queues []internaljob.Queue
Expand All @@ -90,3 +105,8 @@ func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {

return queues
}

// getSchedulerQueue gets scheduler queue.
func getSchedulerQueue(scheduler models.Scheduler) (internaljob.Queue, error) {
return internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname)
}
6 changes: 3 additions & 3 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
filter := json.Filter
rawheader := json.Headers

// Initialize queues
// Initialize queues.
queues := getSchedulerQueues(schedulers)

// Generate download files
// Generate download files.
var files []internaljob.PreheatRequest
switch PreheatType(json.Type) {
case PreheatImageType:
// Parse image manifest url
// Parse image manifest url.
image, err := parseAccessURL(url)
if err != nil {
return nil, err
Expand Down
88 changes: 59 additions & 29 deletions manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
Expand All @@ -44,61 +45,90 @@ type SyncPeers interface {

// syncPeers is an implementation of SyncPeers.
type syncPeers struct {
job *internaljob.Job
config *config.Config
job *internaljob.Job
db *gorm.DB
done chan struct{}
}

// newSyncPeers returns a new SyncPeers.
func newSyncPeers(job *internaljob.Job) (SyncPeers, error) {
return &syncPeers{job}, nil
func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) {
return &syncPeers{
config: cfg,
db: gdb,
job: job,
done: make(chan struct{}),
}, nil
}

// TODO Implement function.
// Started sync peers server.
func (s *syncPeers) Serve() {
tick := time.NewTicker(s.config.Job.SyncPeers.Interval)
for {
select {
case <-tick.C:
// Find all of the scheduler clusters that has active schedulers.
var candidateSchedulerClusters []models.SchedulerCluster
if err := s.db.WithContext(context.Background()).Find(&candidateSchedulerClusters).Error; err != nil {
logger.Errorf("find candidate scheduler clusters failed: %v", err)
break
}

var candidateSchedulers []models.Scheduler
for _, candidateSchedulerCluster := range candidateSchedulerClusters {
var schedulers []models.Scheduler
if err := s.db.WithContext(context.Background()).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{
SchedulerClusterID: candidateSchedulerCluster.ID,
State: models.SchedulerStateActive,
}).Error; err != nil {
continue
}

candidateSchedulers = append(candidateSchedulers, schedulers...)
}

for _, scheduler := range candidateSchedulers {
if _, err := s.createSyncPeers(context.Background(), scheduler); err != nil {
logger.Error(err)
}
}
case <-s.done:
return
}
}
}

// TODO Implement function.
// Stop sync peers server.
func (s *syncPeers) Stop() {
close(s.done)
}

// createSyncPeers creates sync peers.
func (s *syncPeers) createSyncPeers(ctx context.Context, schedulers []models.Scheduler) (*internaljob.GroupJobState, error) {
func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) (any, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanSyncPeers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()

// Initialize queues
queues := getSchedulerQueues(schedulers)

var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.SyncPeersJob,
RoutingKey: queue.String(),
})
}

group, err := machineryv1tasks.NewGroup(signatures...)
// Initialize queue.
queue, err := getSchedulerQueue(scheduler)
if err != nil {
return nil, err
}

var tasks []machineryv1tasks.Signature
for _, signature := range signatures {
tasks = append(tasks, *signature)
// Initialize task signature.
task := &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.SyncPeersJob,
RoutingKey: queue.String(),
}

logger.Infof("create sync peers group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := s.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create sync peers group %s failed", group.GroupUUID, err)
logger.Infof("create sync peers in queue %v, task: %#v", queue, task)
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
if err != nil {
logger.Errorf("create sync peers in queue %v failed", queue, err)
return nil, err
}

return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: time.Now(),
}, nil
return asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval)
}
2 changes: 1 addition & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
searcher := searcher.New(d.PluginDir())

// Initialize job
job, err := job.New(cfg)
job, err := job.New(cfg, db.DB)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 51c1f9b

Please sign in to comment.