Skip to content

Commit

Permalink
feat: add sync peer job for scheduler (#2663)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 22, 2023
1 parent 036e32c commit dfde8bd
Show file tree
Hide file tree
Showing 18 changed files with 384 additions and 27 deletions.
4 changes: 4 additions & 0 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ const (

// Job Name.
const (
// PreheatJob is the name of preheat job.
PreheatJob = "preheat"

// SyncPeersJob is the name of syncing peers job.
SyncPeersJob = "sync_peers"
)

// Machinery server configuration.
Expand Down
16 changes: 16 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ type TCPListenPortRange struct {
type JobConfig struct {
// Preheat configuration.
Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"`

// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`
}

type PreheatConfig struct {
Expand All @@ -294,6 +297,12 @@ type PreheatConfig struct {
TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"`
}

type SyncPeersConfig struct {
// Interval is the interval for syncing all peers information from the scheduler and
// display peers information in the manager console.
Interval time.Duration `yaml:"interval" mapstructure:"interval"`
}

type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
Expand Down Expand Up @@ -427,6 +436,9 @@ func New() *Config {
Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
},
SyncPeers: SyncPeersConfig{
Interval: DefaultJobSyncPeersInterval,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down Expand Up @@ -607,6 +619,10 @@ func (cfg *Config) Validate() error {
return errors.New("preheat requires parameter registryTimeout")
}

if cfg.Job.SyncPeers.Interval <= MinJobSyncPeersInterval {
return errors.New("syncPeers requires parameter interval and it must be greater than 12 hours")
}

if cfg.ObjectStorage.Enable {
if cfg.ObjectStorage.Name == "" {
return errors.New("objectStorage requires parameter name")
Expand Down
18 changes: 18 additions & 0 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func TestConfig_Load(t *testing.T) {
CACert: "foo",
},
},
SyncPeers: SyncPeersConfig{
Interval: 13 * time.Hour,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: true,
Expand Down Expand Up @@ -741,6 +744,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "preheat requires parameter registryTimeout")
},
},
{
name: "syncPeers requires parameter interval",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.SyncPeers.Interval = 11 * time.Hour
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "syncPeers requires parameter interval and it must be greater than 12 hours")
},
},
{
name: "objectStorage requires parameter name",
config: New(),
Expand Down
1 change: 1 addition & 0 deletions manager/config/constant_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (

const (
SpanPreheat = "preheat"
SpanSyncPeers = "sync-peers"
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
)
6 changes: 6 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ const (
const (
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
DefaultJobPreheatRegistryTimeout = 1 * time.Minute

// DefaultJobSyncPeersInterval is the default interval for syncing all peers information from the scheduler.
DefaultJobSyncPeersInterval = 24 * time.Hour

// MinJobSyncPeersInterval is the min interval for syncing all peers information from the scheduler.
MinJobSyncPeersInterval = 12 * time.Hour
)

const (
Expand Down
3 changes: 3 additions & 0 deletions manager/config/testdata/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ job:
registryTimeout: 1m
tls:
caCert: testdata/ca.crt
syncPeers:
interval: 13h

objectStorage:
enable: true
name: s3
Expand Down
36 changes: 33 additions & 3 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@ import (
"crypto/x509"
"errors"

"go.opentelemetry.io/otel"

internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
)

// tracer is a global tracer for job.
var tracer = otel.Tracer("manager")

// Job is an implementation of job.
type Job struct {
*internaljob.Job
Preheat
SyncPeers
}

// New returns a new Job.
func New(cfg *config.Config) (*Job, error) {
j, err := internaljob.New(&internaljob.Config{
Addrs: cfg.Database.Redis.Addrs,
Expand All @@ -50,13 +59,34 @@ func New(cfg *config.Config) (*Job, error) {
}
}

p, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool)
preheat, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool)
if err != nil {
return nil, err
}

syncPeers, err := newSyncPeers(j)
if err != nil {
return nil, err
}

return &Job{
Job: j,
Preheat: p,
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
}, nil
}

// getSchedulerQueues gets scheduler queues.
func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {
var queues []internaljob.Queue
for _, scheduler := range schedulers {
queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname)
if err != nil {
continue
}

queues = append(queues, queue)
}

return queues
}
58 changes: 58 additions & 0 deletions manager/job/mocks/sync_peers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 15 additions & 17 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/go-http-utils/headers"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand All @@ -47,8 +46,6 @@ import (
nethttp "d7y.io/dragonfly/v2/pkg/net/http"
)

var tracer = otel.Tracer("manager")

type PreheatType string

const (
Expand All @@ -59,29 +56,36 @@ const (
PreheatFileType PreheatType = "file"
)

// accessURLPattern is the pattern of access url.
var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")

// Preheat is an interface for preheat job.
type Preheat interface {
// CreatePreheat creates a preheat job.
CreatePreheat(context.Context, []models.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error)
}

// preheat is an implementation of Preheat.
type preheat struct {
job *internaljob.Job
httpRequestTimeout time.Duration
rootCAs *x509.CertPool
}

// preheatImage is image information for preheat.
type preheatImage struct {
protocol string
domain string
name string
tag string
}

// newPreheat creates a new Preheat.
func newPreheat(job *internaljob.Job, httpRequestTimeout time.Duration, rootCAs *x509.CertPool) (Preheat, error) {
return &preheat{job, httpRequestTimeout, rootCAs}, nil
}

// CreatePreheat creates a preheat job.
func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer))
Expand Down Expand Up @@ -127,6 +131,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
return p.createGroupJob(ctx, files, queues)
}

// createGroupJob creates a group job.
func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
Expand Down Expand Up @@ -169,6 +174,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea
}, nil
}

// getLayers gets layers of image.
func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
Expand Down Expand Up @@ -204,6 +210,7 @@ func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header
return layers, nil
}

// getManifests gets manifests of image.
func (p *preheat) getManifests(ctx context.Context, url string, header http.Header, timeout time.Duration) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand All @@ -229,6 +236,7 @@ func (p *preheat) getManifests(ctx context.Context, url string, header http.Head
return resp, nil
}

// parseLayers parses layers of image.
func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
body, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -255,6 +263,7 @@ func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, head
return layers, nil
}

// getAuthToken gets auth token from registry.
func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration, rootCAs *x509.CertPool) (string, error) {
ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
Expand Down Expand Up @@ -298,6 +307,7 @@ func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration
return token, nil
}

// authURL gets auth url from www-authenticate header.
func authURL(wwwAuth []string) string {
// Bearer realm="<auth-service-url>",service="<service>",scope="repository:<name>:pull"
if len(wwwAuth) == 0 {
Expand All @@ -315,10 +325,12 @@ func authURL(wwwAuth []string) string {
return fmt.Sprintf("%s?%s", host, query)
}

// layerURL gets layer url.
func layerURL(protocol string, domain string, name string, digest string) string {
return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", protocol, domain, name, digest)
}

// parseAccessURL parses access url.
func parseAccessURL(url string) (*preheatImage, error) {
r := accessURLPattern.FindStringSubmatch(url)
if len(r) != 5 {
Expand All @@ -332,17 +344,3 @@ func parseAccessURL(url string) (*preheatImage, error) {
tag: r[4],
}, nil
}

func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {
var queues []internaljob.Queue
for _, scheduler := range schedulers {
queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname)
if err != nil {
continue
}

queues = append(queues, queue)
}

return queues
}
Loading

0 comments on commit dfde8bd

Please sign in to comment.