Skip to content

Commit

Permalink
feat: download tiny file with https scheme (#2617)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 10, 2023
1 parent afa665c commit a5b0f74
Show file tree
Hide file tree
Showing 26 changed files with 445 additions and 249 deletions.
15 changes: 12 additions & 3 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,14 @@ type JobConfig struct {
}

type PreheatConfig struct {
// RegistryTimeout is the timeout for requesting registry to get token and manifest.
RegistryTimeout time.Duration `yaml:"registryTimeout" mapstructure:"registryTimeout"`

// TLS client configuration.
TLS *TLSClientConfig `yaml:"tls" mapstructure:"tls"`
TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"`
}

type TLSClientConfig struct {
type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
}
Expand Down Expand Up @@ -424,7 +427,9 @@ func New() *Config {
},
},
Job: JobConfig{
Preheat: PreheatConfig{},
Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down Expand Up @@ -602,6 +607,10 @@ func (cfg *Config) Validate() error {
}
}

if cfg.Job.Preheat.RegistryTimeout == 0 {
return errors.New("preheat requires parameter registryTimeout")
}

if cfg.ObjectStorage.Enable {
if cfg.ObjectStorage.Name == "" {
return errors.New("objectStorage requires parameter name")
Expand Down
20 changes: 18 additions & 2 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ func TestConfig_Load(t *testing.T) {
},
Job: JobConfig{
Preheat: PreheatConfig{
TLS: &TLSClientConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
TLS: &PreheatTLSClientConfig{
CACert: "foo",
},
},
Expand Down Expand Up @@ -718,7 +719,7 @@ func TestConfig_Validate(t *testing.T) {
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.Preheat.TLS = &TLSClientConfig{
cfg.Job.Preheat.TLS = &PreheatTLSClientConfig{
CACert: "",
}
},
Expand All @@ -727,6 +728,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "preheat requires parameter caCert")
},
},
{
name: "preheat requires parameter registryTimeout",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.Preheat.RegistryTimeout = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "preheat requires parameter registryTimeout")
},
},
{
name: "objectStorage requires parameter name",
config: New(),
Expand Down
5 changes: 5 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ const (
DefaultMysqlDBName = "manager"
)

const (
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
DefaultJobPreheatRegistryTimeout = 1 * time.Minute
)

const (
// DefaultPostgresPort is default port for postgres.
DefaultPostgresPort = 5432
Expand Down
1 change: 1 addition & 0 deletions manager/config/testdata/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ cache:

job:
preheat:
registryTimeout: 1m
tls:
caCert: testdata/ca.crt
objectStorage:
Expand Down
2 changes: 1 addition & 1 deletion manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(cfg *config.Config) (*Job, error) {
}
}

p, err := newPreheat(j, certPool)
p, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool)
if err != nil {
return nil, err
}
Expand Down
28 changes: 12 additions & 16 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,16 @@ const (
PreheatFileType PreheatType = "file"
)

const (
// defaultHTTPRequesttimeout is the default timeout of http client.
defaultHTTPRequesttimeout = 1 * time.Minute
)

var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")

type Preheat interface {
CreatePreheat(context.Context, []models.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error)
}

type preheat struct {
job *internaljob.Job
rootCAs *x509.CertPool
job *internaljob.Job
httpRequestTimeout time.Duration
rootCAs *x509.CertPool
}

type preheatImage struct {
Expand All @@ -82,8 +78,8 @@ type preheatImage struct {
tag string
}

func newPreheat(job *internaljob.Job, rootCAs *x509.CertPool) (Preheat, error) {
return &preheat{job, rootCAs}, nil
func newPreheat(job *internaljob.Job, httpRequestTimeout time.Duration, rootCAs *x509.CertPool) (Preheat, error) {
return &preheat{job, httpRequestTimeout, rootCAs}, nil
}

func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) {
Expand Down Expand Up @@ -177,21 +173,21 @@ func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()

resp, err := p.getManifests(ctx, url, header)
resp, err := p.getManifests(ctx, url, header, p.httpRequestTimeout)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
if resp.StatusCode == http.StatusUnauthorized {
token, err := getAuthToken(ctx, resp.Header, p.rootCAs)
token, err := getAuthToken(ctx, resp.Header, p.httpRequestTimeout, p.rootCAs)
if err != nil {
return nil, err
}

header.Add(headers.Authorization, fmt.Sprintf("Bearer %s", token))
resp, err = p.getManifests(ctx, url, header)
resp, err = p.getManifests(ctx, url, header, p.httpRequestTimeout)
if err != nil {
return nil, err
}
Expand All @@ -208,7 +204,7 @@ func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header
return layers, nil
}

func (p *preheat) getManifests(ctx context.Context, url string, header http.Header) (*http.Response, error) {
func (p *preheat) getManifests(ctx context.Context, url string, header http.Header, timeout time.Duration) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand All @@ -218,7 +214,7 @@ func (p *preheat) getManifests(ctx context.Context, url string, header http.Head
req.Header.Add(headers.Accept, schema2.MediaTypeManifest)

client := &http.Client{
Timeout: defaultHTTPRequesttimeout,
Timeout: timeout,
Transport: &http.Transport{
DialContext: nethttp.NewSafeDialer().DialContext,
TLSClientConfig: &tls.Config{RootCAs: p.rootCAs},
Expand Down Expand Up @@ -259,7 +255,7 @@ func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, head
return layers, nil
}

func getAuthToken(ctx context.Context, header http.Header, rootCAs *x509.CertPool) (string, error) {
func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration, rootCAs *x509.CertPool) (string, error) {
ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()

Expand All @@ -274,7 +270,7 @@ func getAuthToken(ctx context.Context, header http.Header, rootCAs *x509.CertPoo
}

client := &http.Client{
Timeout: defaultHTTPRequesttimeout,
Timeout: timeout,
Transport: &http.Transport{
DialContext: nethttp.NewSafeDialer().DialContext,
TLSClientConfig: &tls.Config{RootCAs: rootCAs},
Expand Down
103 changes: 76 additions & 27 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ type Config struct {
// Base options.
base.Options `yaml:",inline" mapstructure:",squash"`

// Scheduler configuration.
Scheduler SchedulerConfig `yaml:"scheduler" mapstructure:"scheduler"`

// Server configuration.
Server ServerConfig `yaml:"server" mapstructure:"server"`

// Scheduler configuration.
Scheduler SchedulerConfig `yaml:"scheduler" mapstructure:"scheduler"`

// Database configuration.
Database DatabaseConfig `yaml:"database" mapstructure:"database"`

// Resource configuration.
Resource ResourceConfig `yaml:"resource" mapstructure:"resource"`

// Dynconfig configuration.
DynConfig DynConfig `yaml:"dynConfig" mapstructure:"dynConfig"`

Expand Down Expand Up @@ -109,11 +112,6 @@ type ServerConfig struct {
DataDir string `yaml:"dataDir" mapstructure:"dataDir"`
}

type DatabaseConfig struct {
// Redis configuration.
Redis RedisConfig `yaml:"redis" mapstructure:"redis"`
}

type SchedulerConfig struct {
// Algorithm is scheduling algorithm used by the scheduler.
Algorithm string `yaml:"algorithm" mapstructure:"algorithm"`
Expand All @@ -134,6 +132,38 @@ type SchedulerConfig struct {
GC GCConfig `yaml:"gc" mapstructure:"gc"`
}

type DatabaseConfig struct {
// Redis configuration.
Redis RedisConfig `yaml:"redis" mapstructure:"redis"`
}

type ResourceConfig struct {
// Task resource configuration.
Task TaskConfig `yaml:"task" mapstructure:"task"`
}

type TaskConfig struct {
// Download tiny task configuration.
DownloadTiny DownloadTinyConfig `yaml:"downloadTiny" mapstructure:"downloadTiny"`
}

type DownloadTinyConfig struct {
// Scheme is download tiny task scheme.
Scheme string `yaml:"scheme" mapstructure:"scheme"`

// Timeout is http request timeout.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`

// TLS is download tiny task TLS configuration.
TLS DownloadTinyTLSClientConfig `yaml:"tls" mapstructure:"tls"`
}

type DownloadTinyTLSClientConfig struct {
// InsecureSkipVerify controls whether a client verifies the
// server's certificate chain and host name.
InsecureSkipVerify bool `yaml:"insecureSkipVerify" mapstructure:"insecureSkipVerify"`
}

type GCConfig struct {
// PieceDownloadTimeout is timeout of downloading piece.
PieceDownloadTimeout time.Duration `yaml:"pieceDownloadTimeout" mapstructure:"pieceDownloadTimeout"`
Expand Down Expand Up @@ -338,13 +368,6 @@ func New() *Config {
AdvertisePort: DefaultServerAdvertisePort,
Host: fqdn.FQDNHostname,
},
Database: DatabaseConfig{
Redis: RedisConfig{
BrokerDB: DefaultRedisBrokerDB,
BackendDB: DefaultRedisBackendDB,
NetworkTopologyDB: DefaultNetworkTopologyDB,
},
},
Scheduler: SchedulerConfig{
Algorithm: DefaultSchedulerAlgorithm,
BackToSourceCount: DefaultSchedulerBackToSourceCount,
Expand All @@ -360,6 +383,24 @@ func New() *Config {
HostTTL: DefaultSchedulerHostTTL,
},
},
Database: DatabaseConfig{
Redis: RedisConfig{
BrokerDB: DefaultRedisBrokerDB,
BackendDB: DefaultRedisBackendDB,
NetworkTopologyDB: DefaultNetworkTopologyDB,
},
},
Resource: ResourceConfig{
Task: TaskConfig{
DownloadTiny: DownloadTinyConfig{
Scheme: DefaultResourceTaskDownloadTinyScheme,
Timeout: DefaultResourceTaskDownloadTinyTimeout,
TLS: DownloadTinyTLSClientConfig{
InsecureSkipVerify: true,
},
},
},
},
DynConfig: DynConfig{
RefreshInterval: DefaultDynConfigRefreshInterval,
},
Expand Down Expand Up @@ -441,18 +482,6 @@ func (cfg *Config) Validate() error {
return errors.New("server requires parameter host")
}

if cfg.Database.Redis.BrokerDB < 0 {
return errors.New("redis requires parameter brokerDB")
}

if cfg.Database.Redis.BackendDB < 0 {
return errors.New("redis requires parameter backendDB")
}

if cfg.Database.Redis.NetworkTopologyDB < 0 {
return errors.New("redis requires parameter networkTopologyDB")
}

if cfg.Scheduler.Algorithm == "" {
return errors.New("scheduler requires parameter algorithm")
}
Expand Down Expand Up @@ -497,6 +526,26 @@ func (cfg *Config) Validate() error {
return errors.New("scheduler requires parameter hostTTL")
}

if cfg.Database.Redis.BrokerDB < 0 {
return errors.New("redis requires parameter brokerDB")
}

if cfg.Database.Redis.BackendDB < 0 {
return errors.New("redis requires parameter backendDB")
}

if cfg.Database.Redis.NetworkTopologyDB < 0 {
return errors.New("redis requires parameter networkTopologyDB")
}

if !slices.Contains([]string{"http", "https"}, cfg.Resource.Task.DownloadTiny.Scheme) {
return errors.New("downloadTiny requires parameter scheme")
}

if cfg.Resource.Task.DownloadTiny.Timeout == 0 {
return errors.New("downloadTiny requires parameter timeout")
}

if cfg.DynConfig.RefreshInterval <= 0 {
return errors.New("dynconfig requires parameter refreshInterval")
}
Expand Down
Loading

0 comments on commit a5b0f74

Please sign in to comment.