diff --git a/scheduler/config/config.go b/scheduler/config/config.go index c4f76850033..30bcee3912c 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -81,6 +81,7 @@ func New() *Config { Location: "", IDC: "", NetTopology: "", + LoadLimit: 100, }, }, }, diff --git a/scheduler/config/dynconfig.go b/scheduler/config/dynconfig.go index 9e987c03ea1..cfd620aa567 100644 --- a/scheduler/config/dynconfig.go +++ b/scheduler/config/dynconfig.go @@ -52,6 +52,7 @@ type CDN struct { Location string `yaml:"location" mapstructure:"location" json:"location"` IDC string `yaml:"idc" mapstructure:"idc" json:"idc"` NetTopology string `yaml:"netTopology" mapstructure:"netTopology" json:"net_topology"` + LoadLimit int32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit"` } type DynconfigInterface interface { diff --git a/scheduler/core/service.go b/scheduler/core/service.go index e8209d1b481..04597461203 100644 --- a/scheduler/core/service.go +++ b/scheduler/core/service.go @@ -32,8 +32,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/supervisor" "d7y.io/dragonfly/v2/scheduler/supervisor/cdn" - "d7y.io/dragonfly/v2/scheduler/supervisor/cdn/d7y" - "d7y.io/dragonfly/v2/scheduler/supervisor/cdn/source" "d7y.io/dragonfly/v2/scheduler/supervisor/host" "d7y.io/dragonfly/v2/scheduler/supervisor/peer" "d7y.io/dragonfly/v2/scheduler/supervisor/task" @@ -65,31 +63,20 @@ type SchedulerService struct { } func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.DynconfigInterface, openTel bool) (*SchedulerService, error) { - dynConfigData, err := dynConfig.Get() - if err != nil { - return nil, err - } hostManager := host.NewManager() peerManager := peer.NewManager(cfg.GC, hostManager) - var cdnManager supervisor.CDNMgr - if cfg.DisableCDN { - if cdnManager, err = source.NewManager(peerManager, hostManager); err != nil { - return nil, errors.Wrap(err, "new back source cdn manager") - } - } else { - var opts []grpc.DialOption - if openTel { - opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor())) - } - cdnClient, err := cdn.NewRefreshableCDNClient(dynConfig, opts) - if err != nil { - return nil, errors.Wrap(err, "new refreshable cdn client") - } - if cdnManager, err = d7y.NewManager(cdnClient, peerManager, hostManager); err != nil { - return nil, errors.Wrap(err, "new cdn manager") - } - hostManager.OnNotify(dynConfigData) - dynConfig.Register(hostManager) + + var opts []grpc.DialOption + if openTel { + opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor())) + } + cdnClient, err := cdn.NewRefreshableCDNClient(dynConfig, opts) + if err != nil { + return nil, errors.Wrap(err, "new refreshable cdn client") + } + cdnManager, err := cdn.NewManager(cdnClient, peerManager, hostManager) + if err != nil { + return nil, errors.Wrap(err, "new cdn manager") } taskManager := task.NewManager(cfg.GC, peerManager) sched, err := scheduler.Get(cfg.Scheduler).Build(cfg, &scheduler.BuildOptions{ diff --git a/scheduler/supervisor/cdn/error.go b/scheduler/supervisor/cdn/error.go deleted file mode 100644 index a06e454e678..00000000000 --- a/scheduler/supervisor/cdn/error.go +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cdn - -import "github.com/pkg/errors" - -var ( - ErrCDNRegisterFail = errors.New("cdn task register failed") - - ErrCDNDownloadFail = errors.New("cdn task download failed") - - ErrCDNUnknown = errors.New("cdn obtain seed encounter unknown err") - - ErrCDNInvokeFail = errors.New("invoke cdn interface failed") - - ErrInitCDNPeerFail = errors.New("init cdn peer failed") -) diff --git a/scheduler/supervisor/cdn/d7y/manager.go b/scheduler/supervisor/cdn/manager.go similarity index 81% rename from scheduler/supervisor/cdn/d7y/manager.go rename to scheduler/supervisor/cdn/manager.go index 962243823c4..23cc12637f7 100644 --- a/scheduler/supervisor/cdn/d7y/manager.go +++ b/scheduler/supervisor/cdn/manager.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package d7y +package cdn import ( "context" @@ -31,12 +31,23 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/supervisor" - "d7y.io/dragonfly/v2/scheduler/supervisor/cdn" "github.com/pkg/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" ) +var ( + ErrCDNRegisterFail = errors.New("cdn task register failed") + + ErrCDNDownloadFail = errors.New("cdn task download failed") + + ErrCDNUnknown = errors.New("cdn obtain seed encounter unknown err") + + ErrCDNInvokeFail = errors.New("invoke cdn interface failed") + + ErrInitCDNPeerFail = errors.New("init cdn peer failed") +) + var tracer trace.Tracer func init() { @@ -44,13 +55,13 @@ func init() { } type manager struct { - client client.CdnClient + client RefreshableCDNClient peerManager supervisor.PeerMgr hostManager supervisor.HostMgr lock sync.RWMutex } -func NewManager(cdnClient client.CdnClient, peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) { +func NewManager(cdnClient RefreshableCDNClient, peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) { mgr := &manager{ client: cdnClient, peerManager: peerManager, @@ -70,7 +81,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*s } seedSpan.SetAttributes(config.AttributeCDNSeedRequest.String(seedRequest.String())) if cm.client == nil { - err := cdn.ErrCDNRegisterFail + err := ErrCDNRegisterFail seedSpan.RecordError(err) seedSpan.SetAttributes(config.AttributePeerDownloadSuccess.Bool(false)) return nil, err @@ -83,14 +94,14 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*s logger.Errorf("failed to obtain cdn seed: %v", cdnErr) switch cdnErr.Code { case dfcodes.CdnTaskRegistryFail: - return nil, errors.Wrap(cdn.ErrCDNRegisterFail, "obtain seeds") + return nil, errors.Wrap(ErrCDNRegisterFail, "obtain seeds") case dfcodes.CdnTaskDownloadFail: - return nil, errors.Wrapf(cdn.ErrCDNDownloadFail, "obtain seeds") + return nil, errors.Wrapf(ErrCDNDownloadFail, "obtain seeds") default: - return nil, errors.Wrapf(cdn.ErrCDNUnknown, "obtain seeds") + return nil, errors.Wrapf(ErrCDNUnknown, "obtain seeds") } } - return nil, errors.Wrapf(cdn.ErrCDNInvokeFail, "obtain seeds from cdn: %v", err) + return nil, errors.Wrapf(ErrCDNInvokeFail, "obtain seeds from cdn: %v", err) } return cm.receivePiece(ctx, task, stream) } @@ -115,14 +126,14 @@ func (cm *manager) receivePiece(ctx context.Context, task *supervisor.Task, stre span.RecordError(recvErr) switch recvErr.Code { case dfcodes.CdnTaskRegistryFail: - return cdnPeer, errors.Wrapf(cdn.ErrCDNRegisterFail, "receive piece") + return cdnPeer, errors.Wrapf(ErrCDNRegisterFail, "receive piece") case dfcodes.CdnTaskDownloadFail: - return cdnPeer, errors.Wrapf(cdn.ErrCDNDownloadFail, "receive piece") + return cdnPeer, errors.Wrapf(ErrCDNDownloadFail, "receive piece") default: - return cdnPeer, errors.Wrapf(cdn.ErrCDNUnknown, "recive piece") + return cdnPeer, errors.Wrapf(ErrCDNUnknown, "recive piece") } } - return cdnPeer, errors.Wrapf(cdn.ErrCDNInvokeFail, "receive piece from cdn: %v", err) + return cdnPeer, errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err) } if piece != nil { span.AddEvent(config.EventPieceReceived, trace.WithAttributes(config.AttributePieceReceived.String(piece.String()))) @@ -170,10 +181,12 @@ func (cm *manager) initCdnPeer(ctx context.Context, task *supervisor.Task, ps *c var cdnHost *supervisor.PeerHost cdnPeer, ok := cm.peerManager.Get(ps.PeerId) if !ok { - logger.Debugf("first seed cdn task for taskID %s", task.TaskID) if cdnHost, ok = cm.hostManager.Get(ps.HostUuid); !ok { - logger.Errorf("cannot find host %s", ps.HostUuid) - return nil, errors.Wrapf(cdn.ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid) + if cdnHost, ok = cm.client.GetCDNHost(ps.HostUuid); !ok { + logger.Errorf("cannot find cdn host %s", ps.HostUuid) + return nil, errors.Wrapf(ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid) + } + cm.hostManager.Add(cdnHost) } cdnPeer = supervisor.NewPeer(ps.PeerId, task, cdnHost) } diff --git a/scheduler/supervisor/cdn/reloadable_cdn_client.go b/scheduler/supervisor/cdn/reloadable_cdn_client.go index 83fd647661d..961d137230f 100644 --- a/scheduler/supervisor/cdn/reloadable_cdn_client.go +++ b/scheduler/supervisor/cdn/reloadable_cdn_client.go @@ -22,23 +22,30 @@ import ( "reflect" "sync" + "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/supervisor" "google.golang.org/grpc" ) type RefreshableCDNClient interface { cdnclient.CdnClient config.Observer + GetCDNHost(hostID string) (*supervisor.PeerHost, bool) } type refreshableCDNClient struct { mu sync.RWMutex cdnClient cdnclient.CdnClient - cdnAddrs []dfnet.NetAddr + cdnHosts map[string]*supervisor.PeerHost +} + +func (rcc *refreshableCDNClient) UpdateState(addrs []dfnet.NetAddr) { + rcc.cdnClient.UpdateState(addrs) } func (rcc *refreshableCDNClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest, opts ...grpc.CallOption) (*cdnclient.PieceSeedStream, error) { @@ -49,8 +56,13 @@ func (rcc *refreshableCDNClient) GetPieceTasks(ctx context.Context, addr dfnet.N return rcc.cdnClient.GetPieceTasks(ctx, addr, req, opts...) } -func (rcc *refreshableCDNClient) UpdateState(addrs []dfnet.NetAddr) { - rcc.cdnClient.UpdateState(addrs) +func (rcc *refreshableCDNClient) GetCDNHost(hostID string) (*supervisor.PeerHost, bool) { + rcc.mu.RLock() + defer rcc.mu.RUnlock() + if cdnHost, ok := rcc.cdnHosts[hostID]; ok { + return cdnHost, true + } + return nil, false } func (rcc *refreshableCDNClient) Close() error { @@ -62,43 +74,47 @@ func NewRefreshableCDNClient(dynConfig config.DynconfigInterface, opts []grpc.Di if err != nil { return nil, err } - cdnAddrs := cdnHostsToNetAddrs(dynConfigData.CDNs) + cdnHosts, cdnAddrs := cdnHostsToNetAddrs(dynConfigData.CDNs) cdnClient, err := cdnclient.GetClientByAddr(cdnAddrs, opts...) if err != nil { return nil, err } rcc := &refreshableCDNClient{ cdnClient: cdnClient, - cdnAddrs: cdnAddrs, + cdnHosts: cdnHosts, } dynConfig.Register(rcc) return rcc, nil } func (rcc *refreshableCDNClient) OnNotify(c *config.DynconfigData) { - netAddrs := cdnHostsToNetAddrs(c.CDNs) - rcc.refresh(netAddrs) + rcc.refresh(c.CDNs) } -func (rcc *refreshableCDNClient) refresh(netAddrs []dfnet.NetAddr) { +func (rcc *refreshableCDNClient) refresh(cdns []*config.CDN) { rcc.mu.Lock() defer rcc.mu.Unlock() - - if reflect.DeepEqual(netAddrs, rcc.cdnAddrs) { + cdnHosts, netAddrs := cdnHostsToNetAddrs(cdns) + if reflect.DeepEqual(rcc.cdnHosts, cdnHosts) { return } + rcc.cdnHosts = cdnHosts // Sync CDNManager client netAddrs rcc.cdnClient.UpdateState(netAddrs) } // cdnHostsToNetAddrs coverts manager.CdnHosts to []dfnet.NetAddr. -func cdnHostsToNetAddrs(hosts []*config.CDN) []dfnet.NetAddr { - var netAddrs []dfnet.NetAddr - for i := range hosts { +func cdnHostsToNetAddrs(hosts []*config.CDN) (map[string]*supervisor.PeerHost, []dfnet.NetAddr) { + cdnHostMap := make(map[string]*supervisor.PeerHost, len(hosts)) + netAddrs := make([]dfnet.NetAddr, 0, len(hosts)) + for _, host := range hosts { + hostID := idgen.CDNUUID(host.HostName, host.Port) + cdnHostMap[hostID] = supervisor.NewCDNPeerHost(hostID, host.IP, host.HostName, host.Port, host.DownloadPort, host.SecurityGroup, host.Location, + host.IDC, host.NetTopology, host.LoadLimit) netAddrs = append(netAddrs, dfnet.NetAddr{ Type: dfnet.TCP, - Addr: fmt.Sprintf("%s:%d", hosts[i].IP, hosts[i].Port), + Addr: fmt.Sprintf("%s:%d", host.HostName, host.Port), }) } - return netAddrs + return cdnHostMap, netAddrs } diff --git a/scheduler/supervisor/cdn/source/manager.go b/scheduler/supervisor/cdn/source/manager.go deleted file mode 100644 index 7470c3a4297..00000000000 --- a/scheduler/supervisor/cdn/source/manager.go +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package source - -import ( - "context" - - "d7y.io/dragonfly/v2/scheduler/config" - "d7y.io/dragonfly/v2/scheduler/supervisor" -) - -type manager struct { - peerManager supervisor.PeerMgr - hostManager supervisor.HostMgr -} - -func NewManager(peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) { - mgr := &manager{ - peerManager: peerManager, - hostManager: hostManager, - } - return mgr, nil -} - -func (m manager) OnNotify(data *config.DynconfigData) { - panic("implement me") -} - -func (m manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*supervisor.Peer, error) { - //stream, err := cm.client.ObtainSeeds(context.Background(), &cdnsystem.SeedRequest{ - // TaskId: task.TaskID, - // Url: task.URL, - // UrlMeta: task.URLMeta, - //}) - //if err != nil { - // if cdnErr, ok := err.(*dferrors.DfError); ok { - // logger.Errorf("failed to obtain cdn seed: %v", cdnErr) - // switch cdnErr.Code { - // case dfcodes.CdnTaskRegistryFail: - // return errors.Wrap(cdn.ErrCDNRegisterFail, "obtain seeds") - // case dfcodes.CdnTaskDownloadFail: - // return errors.Wrapf(cdn.ErrCDNDownloadFail, "obtain seeds") - // default: - // return errors.Wrapf(cdn.ErrCDNUnknown, "obtain seeds") - // } - // } - // return errors.Wrapf(cdn.ErrCDNInvokeFail, "obtain seeds from cdn: %v", err) - //} - //return cm.receivePiece(task, stream) - //source.GetContentLength(context.Background(), task.URL, nil) - //task.ListPeers() - //task.SetStatus(types.TaskStatusSuccess) - return nil, nil -} - -var _ supervisor.CDNMgr = (*manager)(nil) diff --git a/scheduler/supervisor/cdn_mgr.go b/scheduler/supervisor/cdn_mgr.go index 035d45c471c..b3dc48ee2d4 100644 --- a/scheduler/supervisor/cdn_mgr.go +++ b/scheduler/supervisor/cdn_mgr.go @@ -21,6 +21,7 @@ import ( ) type CDNMgr interface { + // StartSeedTask start seed cdn task StartSeedTask(ctx context.Context, task *Task) (*Peer, error) } diff --git a/scheduler/supervisor/host/manager.go b/scheduler/supervisor/host/manager.go index 00e97fd40cb..ff771585196 100644 --- a/scheduler/supervisor/host/manager.go +++ b/scheduler/supervisor/host/manager.go @@ -19,8 +19,6 @@ package host import ( "sync" - "d7y.io/dragonfly/v2/internal/idgen" - "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/supervisor" ) @@ -49,11 +47,3 @@ func (m *manager) Get(uuid string) (*supervisor.PeerHost, bool) { } return host.(*supervisor.PeerHost), true } - -func (m *manager) OnNotify(dynconfig *config.DynconfigData) { - for _, cdn := range dynconfig.CDNs { - cdnHost := supervisor.NewCDNPeerHost(idgen.CDNUUID(cdn.HostName, cdn.Port), cdn.IP, cdn.HostName, cdn.Port, cdn.DownloadPort, cdn.SecurityGroup, - cdn.Location, cdn.IDC, cdn.NetTopology, 100) - m.hostMap.Store(cdnHost.UUID, cdnHost) - } -} diff --git a/scheduler/supervisor/host_mgr.go b/scheduler/supervisor/host_mgr.go index 6a9621393b2..823675306ee 100644 --- a/scheduler/supervisor/host_mgr.go +++ b/scheduler/supervisor/host_mgr.go @@ -16,13 +16,7 @@ package supervisor -import ( - "d7y.io/dragonfly/v2/scheduler/config" -) - type HostMgr interface { - config.Observer - Add(host *PeerHost) Delete(uuid string) diff --git a/scheduler/supervisor/peer.go b/scheduler/supervisor/peer.go index 969d642c30e..d955aaf4888 100644 --- a/scheduler/supervisor/peer.go +++ b/scheduler/supervisor/peer.go @@ -116,13 +116,13 @@ func (peer *Peer) Touch() { func (peer *Peer) associateChild(child *Peer) { peer.children.Store(child.PeerID, child) peer.Host.IncUploadLoad() - peer.Task.peers.Update(peer) + peer.Task.UpdatePeer(peer) } func (peer *Peer) disassociateChild(child *Peer) { peer.children.Delete(child.PeerID) peer.Host.DecUploadLoad() - peer.Task.peers.Update(peer) + peer.Task.UpdatePeer(peer) } func (peer *Peer) ReplaceParent(parent *Peer) { @@ -164,7 +164,7 @@ func (peer *Peer) AddPieceInfo(finishedCount int32, cost int) { peer.costHistory = peer.costHistory[len(peer.costHistory)-20:] } peer.lock.Unlock() - peer.Task.peers.Update(peer) + peer.Task.UpdatePeer(peer) return } peer.lock.Unlock() diff --git a/scheduler/supervisor/task.go b/scheduler/supervisor/task.go index b3fc6096f90..38e6dc0c7f8 100644 --- a/scheduler/supervisor/task.go +++ b/scheduler/supervisor/task.go @@ -90,6 +90,10 @@ func (task *Task) AddPeer(peer *Peer) { task.peers.UpdateOrAdd(peer) } +func (task *Task) UpdatePeer(peer *Peer) { + task.peers.Update(peer) +} + func (task *Task) DeletePeer(peer *Peer) { task.peers.Delete(peer) }