Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reload CDN client #566

Merged
merged 4 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func New() *Config {
Location: "",
IDC: "",
NetTopology: "",
LoadLimit: 100,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions scheduler/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CDN struct {
Location string `yaml:"location" mapstructure:"location" json:"location"`
IDC string `yaml:"idc" mapstructure:"idc" json:"idc"`
NetTopology string `yaml:"netTopology" mapstructure:"netTopology" json:"net_topology"`
LoadLimit int32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit"`
}

type DynconfigInterface interface {
Expand Down
37 changes: 12 additions & 25 deletions scheduler/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/core/scheduler"
"d7y.io/dragonfly/v2/scheduler/supervisor"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn/d7y"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn/source"
"d7y.io/dragonfly/v2/scheduler/supervisor/host"
"d7y.io/dragonfly/v2/scheduler/supervisor/peer"
"d7y.io/dragonfly/v2/scheduler/supervisor/task"
Expand Down Expand Up @@ -65,31 +63,20 @@ type SchedulerService struct {
}

func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.DynconfigInterface, openTel bool) (*SchedulerService, error) {
dynConfigData, err := dynConfig.Get()
if err != nil {
return nil, err
}
hostManager := host.NewManager()
peerManager := peer.NewManager(cfg.GC, hostManager)
var cdnManager supervisor.CDNMgr
if cfg.DisableCDN {
if cdnManager, err = source.NewManager(peerManager, hostManager); err != nil {
return nil, errors.Wrap(err, "new back source cdn manager")
}
} else {
var opts []grpc.DialOption
if openTel {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
cdnClient, err := cdn.NewRefreshableCDNClient(dynConfig, opts)
if err != nil {
return nil, errors.Wrap(err, "new refreshable cdn client")
}
if cdnManager, err = d7y.NewManager(cdnClient, peerManager, hostManager); err != nil {
return nil, errors.Wrap(err, "new cdn manager")
}
hostManager.OnNotify(dynConfigData)
dynConfig.Register(hostManager)

var opts []grpc.DialOption
if openTel {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
cdnClient, err := cdn.NewRefreshableCDNClient(dynConfig, opts)
if err != nil {
return nil, errors.Wrap(err, "new refreshable cdn client")
}
cdnManager, err := cdn.NewManager(cdnClient, peerManager, hostManager)
if err != nil {
return nil, errors.Wrap(err, "new cdn manager")
}
taskManager := task.NewManager(cfg.GC, peerManager)
sched, err := scheduler.Get(cfg.Scheduler).Build(cfg, &scheduler.BuildOptions{
Expand Down
12 changes: 6 additions & 6 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ func (s *Server) Serve() error {
}()

// Serve Job
go func() {
if err := s.job.Serve(); err != nil {
logger.Fatalf("job start failed %v", err)
}
logger.Info("job start successfully")
}()
//go func() {
244372610 marked this conversation as resolved.
Show resolved Hide resolved
// if err := s.job.Serve(); err != nil {
// logger.Fatalf("job start failed %v", err)
// }
// logger.Info("job start successfully")
//}()

// Serve Keepalive
if s.managerClient != nil {
Expand Down
31 changes: 0 additions & 31 deletions scheduler/supervisor/cdn/error.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package d7y
package cdn

import (
"context"
Expand All @@ -31,26 +31,37 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/supervisor"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var (
ErrCDNRegisterFail = errors.New("cdn task register failed")

ErrCDNDownloadFail = errors.New("cdn task download failed")

ErrCDNUnknown = errors.New("cdn obtain seed encounter unknown err")

ErrCDNInvokeFail = errors.New("invoke cdn interface failed")

ErrInitCDNPeerFail = errors.New("init cdn peer failed")
)

var tracer trace.Tracer

func init() {
tracer = otel.Tracer("scheduler-cdn")
}

type manager struct {
client client.CdnClient
client RefreshableCDNClient
peerManager supervisor.PeerMgr
hostManager supervisor.HostMgr
lock sync.RWMutex
}

func NewManager(cdnClient client.CdnClient, peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) {
func NewManager(cdnClient RefreshableCDNClient, peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) {
mgr := &manager{
client: cdnClient,
peerManager: peerManager,
Expand All @@ -70,7 +81,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*s
}
seedSpan.SetAttributes(config.AttributeCDNSeedRequest.String(seedRequest.String()))
if cm.client == nil {
err := cdn.ErrCDNRegisterFail
err := ErrCDNRegisterFail
seedSpan.RecordError(err)
seedSpan.SetAttributes(config.AttributePeerDownloadSuccess.Bool(false))
return nil, err
Expand All @@ -83,14 +94,14 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*s
logger.Errorf("failed to obtain cdn seed: %v", cdnErr)
switch cdnErr.Code {
case dfcodes.CdnTaskRegistryFail:
return nil, errors.Wrap(cdn.ErrCDNRegisterFail, "obtain seeds")
return nil, errors.Wrap(ErrCDNRegisterFail, "obtain seeds")
case dfcodes.CdnTaskDownloadFail:
return nil, errors.Wrapf(cdn.ErrCDNDownloadFail, "obtain seeds")
return nil, errors.Wrapf(ErrCDNDownloadFail, "obtain seeds")
default:
return nil, errors.Wrapf(cdn.ErrCDNUnknown, "obtain seeds")
return nil, errors.Wrapf(ErrCDNUnknown, "obtain seeds")
}
}
return nil, errors.Wrapf(cdn.ErrCDNInvokeFail, "obtain seeds from cdn: %v", err)
return nil, errors.Wrapf(ErrCDNInvokeFail, "obtain seeds from cdn: %v", err)
}
return cm.receivePiece(ctx, task, stream)
}
Expand All @@ -115,14 +126,14 @@ func (cm *manager) receivePiece(ctx context.Context, task *supervisor.Task, stre
span.RecordError(recvErr)
switch recvErr.Code {
case dfcodes.CdnTaskRegistryFail:
return cdnPeer, errors.Wrapf(cdn.ErrCDNRegisterFail, "receive piece")
return cdnPeer, errors.Wrapf(ErrCDNRegisterFail, "receive piece")
case dfcodes.CdnTaskDownloadFail:
return cdnPeer, errors.Wrapf(cdn.ErrCDNDownloadFail, "receive piece")
return cdnPeer, errors.Wrapf(ErrCDNDownloadFail, "receive piece")
default:
return cdnPeer, errors.Wrapf(cdn.ErrCDNUnknown, "recive piece")
return cdnPeer, errors.Wrapf(ErrCDNUnknown, "recive piece")
}
}
return cdnPeer, errors.Wrapf(cdn.ErrCDNInvokeFail, "receive piece from cdn: %v", err)
return cdnPeer, errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err)
}
if piece != nil {
span.AddEvent(config.EventPieceReceived, trace.WithAttributes(config.AttributePieceReceived.String(piece.String())))
Expand Down Expand Up @@ -170,10 +181,12 @@ func (cm *manager) initCdnPeer(ctx context.Context, task *supervisor.Task, ps *c
var cdnHost *supervisor.PeerHost
cdnPeer, ok := cm.peerManager.Get(ps.PeerId)
if !ok {
logger.Debugf("first seed cdn task for taskID %s", task.TaskID)
if cdnHost, ok = cm.hostManager.Get(ps.HostUuid); !ok {
logger.Errorf("cannot find host %s", ps.HostUuid)
return nil, errors.Wrapf(cdn.ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid)
if cdnHost, ok = cm.client.GetCDNHost(ps.HostUuid); !ok {
logger.Errorf("cannot find cdn host %s", ps.HostUuid)
return nil, errors.Wrapf(ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid)
}
cm.hostManager.Add(cdnHost)
}
cdnPeer = supervisor.NewPeer(ps.PeerId, task, cdnHost)
}
Expand Down
51 changes: 36 additions & 15 deletions scheduler/supervisor/cdn/reloadable_cdn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,31 @@ import (
"reflect"
"sync"

"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/supervisor"
"google.golang.org/grpc"
)

type RefreshableCDNClient interface {
cdnclient.CdnClient
config.Observer
GetCDNHost(hostID string) (*supervisor.PeerHost, bool)
}

type refreshableCDNClient struct {
mu sync.RWMutex
cdnClient cdnclient.CdnClient
cdnAddrs []dfnet.NetAddr
cdnHosts map[string]*supervisor.PeerHost
cdnConfig []*config.CDN
}

func (rcc *refreshableCDNClient) UpdateState(addrs []dfnet.NetAddr) {
rcc.cdnClient.UpdateState(addrs)
}

func (rcc *refreshableCDNClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest, opts ...grpc.CallOption) (*cdnclient.PieceSeedStream, error) {
Expand All @@ -49,8 +57,13 @@ func (rcc *refreshableCDNClient) GetPieceTasks(ctx context.Context, addr dfnet.N
return rcc.cdnClient.GetPieceTasks(ctx, addr, req, opts...)
}

func (rcc *refreshableCDNClient) UpdateState(addrs []dfnet.NetAddr) {
rcc.cdnClient.UpdateState(addrs)
func (rcc *refreshableCDNClient) GetCDNHost(hostID string) (*supervisor.PeerHost, bool) {
rcc.mu.RLock()
defer rcc.mu.RUnlock()
if cdnHost, ok := rcc.cdnHosts[hostID]; ok {
return cdnHost, true
}
return nil, false
}

func (rcc *refreshableCDNClient) Close() error {
Expand All @@ -62,43 +75,51 @@ func NewRefreshableCDNClient(dynConfig config.DynconfigInterface, opts []grpc.Di
if err != nil {
return nil, err
}
cdnAddrs := cdnHostsToNetAddrs(dynConfigData.CDNs)
cdnHosts, cdnAddrs := cdnHostsToNetAddrs(dynConfigData.CDNs)
cdnClient, err := cdnclient.GetClientByAddr(cdnAddrs, opts...)
if err != nil {
return nil, err
}
rcc := &refreshableCDNClient{
cdnClient: cdnClient,
cdnAddrs: cdnAddrs,
cdnHosts: cdnHosts,
cdnConfig: dynConfigData.CDNs,
}
dynConfig.Register(rcc)
return rcc, nil
}

func (rcc *refreshableCDNClient) OnNotify(c *config.DynconfigData) {
netAddrs := cdnHostsToNetAddrs(c.CDNs)
rcc.refresh(netAddrs)
rcc.refresh(c.CDNs)
}

func (rcc *refreshableCDNClient) refresh(netAddrs []dfnet.NetAddr) {
func (rcc *refreshableCDNClient) refresh(cdns []*config.CDN) {
rcc.mu.Lock()
defer rcc.mu.Unlock()

if reflect.DeepEqual(netAddrs, rcc.cdnAddrs) {
if reflect.DeepEqual(rcc.cdnConfig, cdns) {
return
}
cdnHosts, netAddrs := cdnHostsToNetAddrs(cdns)
rcc.cdnHosts = cdnHosts
// Sync CDNManager client netAddrs
rcc.cdnClient.UpdateState(netAddrs)
}

// cdnHostsToNetAddrs coverts manager.CdnHosts to []dfnet.NetAddr.
func cdnHostsToNetAddrs(hosts []*config.CDN) []dfnet.NetAddr {
var netAddrs []dfnet.NetAddr
for i := range hosts {
func cdnHostsToNetAddrs(hosts []*config.CDN) (map[string]*supervisor.PeerHost, []dfnet.NetAddr) {
cdnHostMap := make(map[string]*supervisor.PeerHost, len(hosts))
netAddrs := make([]dfnet.NetAddr, 0, len(hosts))
for _, host := range hosts {
hostID := idgen.CDNUUID(host.HostName, host.Port)
if host.LoadLimit == 0 {
host.LoadLimit = 100
}
cdnHostMap[hostID] = supervisor.NewCDNPeerHost(hostID, host.IP, host.HostName, host.Port, host.DownloadPort, host.SecurityGroup, host.Location,
host.IDC, host.NetTopology, host.LoadLimit)
netAddrs = append(netAddrs, dfnet.NetAddr{
Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", hosts[i].IP, hosts[i].Port),
Addr: fmt.Sprintf("%s:%d", host.IP, host.Port),
})
}
return netAddrs
return cdnHostMap, netAddrs
}
Loading