From 0cc52f5c6354133a4be82d8436af65db890315cc Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Thu, 12 Aug 2021 18:10:03 +0800 Subject: [PATCH] feat: Add distribute Schedule Tracer & Refactor scheduler (#537) * check same cdn list avoid updating client too frequent Signed-off-by: santong --- cdnsystem/config/constants.go | 2 +- client/daemon/daemon.go | 7 +- client/daemon/peer/peertask_file.go | 7 +- client/daemon/peer/peertask_stream.go | 2 +- cmd/cdn/cmd/root.go | 2 +- cmd/dependency/base/option.go | 3 +- cmd/dependency/dependency.go | 14 +- cmd/dfget/cmd/daemon.go | 2 +- cmd/dfget/cmd/root.go | 2 +- cmd/manager/cmd/root.go | 2 +- cmd/scheduler/cmd/root.go | 6 +- go.sum | 2 + internal/dfcodes/rpc_code.go | 1 - pkg/rpc/client.go | 8 +- .../scheduler/client/peer_packet_stream.go | 2 +- pkg/rpc/server.go | 8 +- pkg/structure/sortedlist/sorted_list.go | 1 - scheduler/config/config.go | 28 ++-- scheduler/config/constants_otel.go | 54 +++++++ .../core/evaluator/basic/basic_evaluator.go | 48 ++----- scheduler/core/evaluator/evaluator.go | 2 +- scheduler/core/events.go | 133 ++++++++---------- scheduler/core/monitor.go | 118 ++++------------ .../core/scheduler/basic/basic_scheduler.go | 70 ++++----- .../core/{scheduler_service.go => service.go} | 113 ++++++++++----- scheduler/core/worker.go | 5 - scheduler/daemon/cdn/d7y/manager.go | 103 +++++++++----- scheduler/daemon/cdn/error.go | 14 ++ scheduler/daemon/cdn/source/manager.go | 39 ++++- scheduler/daemon/cdn_mgr.go | 4 +- scheduler/daemon/peer/manager.go | 43 ++++-- scheduler/daemon/peer_mgr.go | 4 +- scheduler/daemon/task/manager.go | 20 ++- scheduler/daemon/task_mgr.go | 3 - scheduler/{server => }/server.go | 13 +- .../server/{service => }/scheduler_server.go | 99 ++++++++++--- scheduler/tasks/tasks.go | 2 +- scheduler/types/host.go | 40 ++---- scheduler/types/peer.go | 121 +++++++--------- scheduler/types/task.go | 63 ++++++--- 40 files changed, 674 insertions(+), 536 deletions(-) create mode 100644 scheduler/config/constants_otel.go rename scheduler/core/{scheduler_service.go => service.go} (69%) rename scheduler/{server => }/server.go (91%) rename scheduler/server/{service => }/scheduler_server.go (64%) diff --git a/cdnsystem/config/constants.go b/cdnsystem/config/constants.go index dc41a4da923..6e378c4bb1e 100644 --- a/cdnsystem/config/constants.go +++ b/cdnsystem/config/constants.go @@ -60,7 +60,7 @@ const ( // DefaultTaskExpireTime when a task is not accessed within the taskExpireTime, // and it will be treated to be expired. - DefaultTaskExpireTime = 3 * time.Minute + DefaultTaskExpireTime = 30 * time.Minute ) const ( diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 60f27b1b9b8..de72f0c0f07 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -51,6 +51,7 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" ) type Daemon interface { @@ -96,7 +97,11 @@ func New(opt *config.DaemonOption) (Daemon, error) { NetTopology: opt.Host.NetTopology, } - sched, err := schedulerclient.GetClientByAddr(opt.Scheduler.NetAddrs) + var opts []grpc.DialOption + if opt.Options.Telemetry.Jaeger != "" { + opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor())) + } + sched, err := schedulerclient.GetClientByAddr(opt.Scheduler.NetAddrs, opts...) if err != nil { return nil, errors.Wrap(err, "failed to get schedulers") } diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index f8e6384aa35..badb4dcf345 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -88,14 +88,14 @@ func newFilePeerTask(ctx context.Context, logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag, request.PeerId) // trace register regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask) - result, err := schedulerClient.RegisterPeerTask(regCtx, request) logger.Infof("step 1: peer %s start to register", request.PeerId) + result, err := schedulerClient.RegisterPeerTask(regCtx, request) regSpan.RecordError(err) regSpan.End() var needBackSource bool if err != nil { - logger.Errorf("step 1: peer %s register failed: err", request.PeerId, err) + logger.Errorf("step 1: peer %s register failed: %v", request.PeerId, err) if schedulerOption.DisableAutoBackSource { logger.Errorf("register peer task failed: %s, peer id: %s, auto back source disabled", err, request.PeerId) span.RecordError(err) @@ -148,9 +148,8 @@ func newFilePeerTask(ctx context.Context, logger.Infof("%s/%s size scope: normal", result.TaskId, request.PeerId) } } - - peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) logger.Infof("step 2: start report peer %s piece result", request.PeerId) + peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request) if err != nil { logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err) defer span.End() diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 6833c0036cc..b730a292451 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -71,8 +71,8 @@ func newStreamPeerTask(ctx context.Context, request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag) // trace register regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask) - result, err := schedulerClient.RegisterPeerTask(regCtx, request) logger.Infof("step 1: peer %s start to register", request.PeerId) + result, err := schedulerClient.RegisterPeerTask(regCtx, request) regSpan.RecordError(err) regSpan.End() diff --git a/cmd/cdn/cmd/root.go b/cmd/cdn/cmd/root.go index f98cf36f20a..0cc8ea94a16 100644 --- a/cmd/cdn/cmd/root.go +++ b/cmd/cdn/cmd/root.go @@ -73,7 +73,7 @@ func runCdnSystem() error { s, _ := yaml.Marshal(cfg) logger.Infof("cdn system configuration:\n%s", string(s)) - ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger) + ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry) defer ff() svr, err := server.New(cfg) diff --git a/cmd/dependency/base/option.go b/cmd/dependency/base/option.go index d4f90f2c627..c4b7f2c8265 100644 --- a/cmd/dependency/base/option.go +++ b/cmd/dependency/base/option.go @@ -25,5 +25,6 @@ type Options struct { // TelemetryOption is the option for telemetry type TelemetryOption struct { - Jaeger string `yaml:"jaeger" mapstructure:"jaeger"` + Jaeger string `yaml:"jaeger" mapstructure:"jaeger"` + ServiceName string `yaml:"serviceName" mapstructure:"serviceName"` } diff --git a/cmd/dependency/dependency.go b/cmd/dependency/dependency.go index 8f50a6ca697..3d1ba2e430f 100644 --- a/cmd/dependency/dependency.go +++ b/cmd/dependency/dependency.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "d7y.io/dragonfly/v2/cmd/dependency/base" "github.com/go-echarts/statsview" "github.com/go-echarts/statsview/viewer" "github.com/mitchellh/mapstructure" @@ -67,6 +68,7 @@ func InitCobra(cmd *cobra.Command, useConfigFile bool, config interface{}) { flags.Bool("verbose", false, "whether logger use debug level") flags.Int("pprof-port", -1, "listen port for pprof, 0 represents random port") flags.String("jaeger", "", "jaeger endpoint url, like: http://localhost:14250/api/traces") + flags.String("service-name", fmt.Sprintf("%s-%s", "dragonfly", cmd.Name()), "name of the service for tracer") flags.String("config", "", fmt.Sprintf("the path of configuration file with yaml extension name, default is %s, it can also be set by env var: %s", filepath.Join(dfpath.DefaultConfigDir, rootName+".yaml"), strings.ToUpper(rootName+"_config"))) // Bind common flags @@ -86,7 +88,7 @@ func InitCobra(cmd *cobra.Command, useConfigFile bool, config interface{}) { } // InitMonitor initialize monitor and return final handler -func InitMonitor(verbose bool, pprofPort int, jaeger string) func() { +func InitMonitor(verbose bool, pprofPort int, otelOption base.TelemetryOption) func() { var fc = make(chan func(), 5) if verbose { @@ -116,8 +118,8 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() { }() } - if jaeger != "" { - ff, err := initJaegerTracer(jaeger) + if otelOption.Jaeger != "" { + ff, err := initJaegerTracer(otelOption) if err != nil { logger.Warnf("init jaeger tracer error: %v", err) } @@ -227,8 +229,8 @@ func initDecoderConfig(dc *mapstructure.DecoderConfig) { } // initTracer creates a new trace provider instance and registers it as global trace provider. -func initJaegerTracer(url string) (func(), error) { - exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) +func initJaegerTracer(otelOption base.TelemetryOption) (func(), error) { + exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(otelOption.Jaeger))) if err != nil { return nil, err } @@ -239,7 +241,7 @@ func initJaegerTracer(url string) (func(), error) { sdktrace.WithSampler(sdktrace.AlwaysSample()), // Record information about this application in an Resource. sdktrace.WithResource(resource.NewWithAttributes( - semconv.ServiceNameKey.String("dragonfly"), + semconv.ServiceNameKey.String(otelOption.ServiceName), semconv.ServiceInstanceIDKey.String(fmt.Sprintf("%s|%s", iputils.HostName, iputils.HostIP)), semconv.ServiceVersionKey.String(version.GitVersion))), ) diff --git a/cmd/dfget/cmd/daemon.go b/cmd/dfget/cmd/daemon.go index 42893684e18..f52f6dbb21d 100644 --- a/cmd/dfget/cmd/daemon.go +++ b/cmd/dfget/cmd/daemon.go @@ -135,7 +135,7 @@ func runDaemon() error { s, _ := yaml.Marshal(cfg) logger.Infof("client daemon configuration:\n%s", string(s)) - ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger) + ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry) defer ff() svr, err := server.New(cfg) diff --git a/cmd/dfget/cmd/root.go b/cmd/dfget/cmd/root.go index a534e5198c3..3e96304b91e 100644 --- a/cmd/dfget/cmd/root.go +++ b/cmd/dfget/cmd/root.go @@ -158,7 +158,7 @@ func runDfget() error { s, _ := yaml.Marshal(dfgetConfig) logger.Infof("client dfget configuration:\n%s", string(s)) - ff := dependency.InitMonitor(dfgetConfig.Verbose, dfgetConfig.PProfPort, dfgetConfig.Telemetry.Jaeger) + ff := dependency.InitMonitor(dfgetConfig.Verbose, dfgetConfig.PProfPort, dfgetConfig.Telemetry) defer ff() var ( diff --git a/cmd/manager/cmd/root.go b/cmd/manager/cmd/root.go index 964cde81b4b..75723a1f394 100644 --- a/cmd/manager/cmd/root.go +++ b/cmd/manager/cmd/root.go @@ -82,7 +82,7 @@ func runManager() error { logger.Infof("manager configuration:\n%s", string(s)) - ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger) + ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry) defer ff() svr, err := server.New(cfg) diff --git a/cmd/scheduler/cmd/root.go b/cmd/scheduler/cmd/root.go index 4fde5651987..107da03e9a5 100644 --- a/cmd/scheduler/cmd/root.go +++ b/cmd/scheduler/cmd/root.go @@ -22,8 +22,8 @@ import ( "d7y.io/dragonfly/v2/cmd/dependency" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/dflog/logcore" + "d7y.io/dragonfly/v2/scheduler" "d7y.io/dragonfly/v2/scheduler/config" - "d7y.io/dragonfly/v2/scheduler/server" "github.com/pkg/errors" "github.com/spf13/cobra" "gopkg.in/yaml.v3" @@ -83,10 +83,10 @@ func runScheduler() error { s, _ := yaml.Marshal(cfg) logger.Infof("scheduler configuration:\n%s", string(s)) - ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger) + ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry) defer ff() - svr, err := server.New(cfg) + svr, err := scheduler.New(cfg) if err != nil { logger.Errorf("get scheduler server error: %s", err) return err diff --git a/go.sum b/go.sum index b30e224d1b1..efcc661e4dc 100644 --- a/go.sum +++ b/go.sum @@ -718,9 +718,11 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0 github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q= github.com/ugorji/go v1.1.13/go.mod h1:jxau1n+/wyTGLQoCkjok9r5zFa/FxT6eI5HiHKQszjc= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ugorji/go/codec v1.1.13 h1:013LbFhocBoIqgHeIHKlV4JWYhqogATYWZhIcH0WHn4= github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCBFCq1OeuU= diff --git a/internal/dfcodes/rpc_code.go b/internal/dfcodes/rpc_code.go index 531fcdff217..a1b4aeea948 100644 --- a/internal/dfcodes/rpc_code.go +++ b/internal/dfcodes/rpc_code.go @@ -51,7 +51,6 @@ const ( SchedPeerPieceResultReportFail base.Code = 5006 SchedCDNSeedFail base.Code = 5007 SchedTaskStatusError base.Code = 5008 - SchedWithoutParentPeer base.Code = 5009 // cdnsystem response error 6000-6999 CdnError base.Code = 6000 diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 66e5134f441..438f1bce09d 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -24,7 +24,6 @@ import ( "github.com/pkg/errors" "github.com/serialx/hashring" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -94,11 +93,8 @@ var defaultClientOpts = []grpc.DialOption{ Time: 2 * time.Minute, Timeout: 10 * time.Second, }), - // TODO make grpc interceptor optional - grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor(), - streamClientInterceptor), - grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor(), - unaryClientInterceptor), + grpc.WithStreamInterceptor(streamClientInterceptor), + grpc.WithUnaryInterceptor(unaryClientInterceptor), } type ConnOption interface { diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index d3983dba01f..8c88a2c1734 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -55,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin pps := &peerPacketStream{ sc: sc, - ctx: context.Background(), + ctx: ctx, hashKey: hashKey, ptr: ptr, opts: opts, diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index d0639b3bc5c..84d288d636d 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -29,7 +29,6 @@ import ( "syscall" "time" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" @@ -76,11 +75,8 @@ var serverOpts = []grpc.ServerOption{ MaxConnectionIdle: 5 * time.Minute, }), grpc.MaxConcurrentStreams(100), - // TODO make grpc interceptor optional - grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor(), - streamServerInterceptor), - grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor(), - unaryServerInterceptor), + grpc.StreamInterceptor(streamServerInterceptor), + grpc.UnaryInterceptor(unaryServerInterceptor), } var sp = struct { diff --git a/pkg/structure/sortedlist/sorted_list.go b/pkg/structure/sortedlist/sorted_list.go index efd8b828f11..606a5557bf8 100644 --- a/pkg/structure/sortedlist/sorted_list.go +++ b/pkg/structure/sortedlist/sorted_list.go @@ -80,7 +80,6 @@ func (l *SortedList) Update(data Item) (err error) { l.deleteItem(oldKey1, oldKey2, data) l.addItem(key1, key2, data) - return } diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 62a019bd02d..42c05c8e5c0 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -116,6 +116,7 @@ func NewDefaultSchedulerConfig() *SchedulerConfig { AScheduler: "", BScheduler: "", WorkerNum: runtime.GOMAXPROCS(0), + BackSourceCount: 3, AccessWindow: 3 * time.Minute, CandidateParentCount: 10, Scheduler: "basic", @@ -130,8 +131,10 @@ func NewDefaultGCConfig() *GCConfig { return &GCConfig{ PeerGCInterval: 5 * time.Minute, TaskGCInterval: 5 * time.Minute, - PeerTTL: 5 * time.Minute, - TaskTTL: 1 * time.Hour, + PeerTTL: 10 * time.Minute, + PeerTTI: 3 * time.Minute, + TaskTTL: 10 * time.Minute, + TaskTTI: 3 * time.Minute, } } @@ -214,17 +217,18 @@ type DynConfig struct { } type SchedulerConfig struct { - DisableCDN bool `yaml:"disableCDN" mapstructure:"disableCDN"` - ABTest bool `yaml:"abtest" mapstructure:"abtest"` - AScheduler string `yaml:"ascheduler" mapstructure:"ascheduler"` - BScheduler string `yaml:"bscheduler" mapstructure:"bscheduler"` - WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"` + DisableCDN bool `yaml:"disableCDN" mapstructure:"disableCDN"` + ABTest bool `yaml:"abtest" mapstructure:"abtest"` + AScheduler string `yaml:"ascheduler" mapstructure:"ascheduler"` + BScheduler string `yaml:"bscheduler" mapstructure:"bscheduler"` + WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"` + BackSourceCount int `yaml:"backSourceCount" mapstructure:"backSourceCount"` // AccessWindow should less than CDN task expireTime AccessWindow time.Duration `yaml:"accessWindow" mapstructure:"accessWindow"` CandidateParentCount int `yaml:"candidateParentCount" mapstructure:"candidateParentCount"` Scheduler string `yaml:"scheduler" mapstructure:"scheduler"` - CDNLoad int `yaml:"cDNLoad" mapstructure:"cDNLoad"` - ClientLoad int `yaml:"clientLoad" mapstructure:"clientLoad"` + CDNLoad int `yaml:"cdnLoad" mapstructure:"cdnLoad"` + ClientLoad int32 `yaml:"clientLoad" mapstructure:"clientLoad"` OpenMonitor bool `yaml:"openMonitor" mapstructure:"openMonitor"` GC *GCConfig `yaml:"gc" mapstructure:"gc"` } @@ -237,9 +241,11 @@ type ServerConfig struct { type GCConfig struct { PeerGCInterval time.Duration `yaml:"peerGCInterval" mapstructure:"peerGCInterval"` + PeerTTL time.Duration `yaml:"peerTTL" mapstructure:"peerTTL"` + PeerTTI time.Duration `yaml:"peerTTI" mapstructure:"peerTTI"` TaskGCInterval time.Duration `yaml:"taskGCInterval" mapstructure:"taskGCInterval"` - PeerTTL time.Duration - TaskTTL time.Duration + TaskTTL time.Duration `yaml:"taskTTL" mapstructure:"taskTTL"` + TaskTTI time.Duration `yaml:"taskTTI" mapstructure:"taskTTI"` } type HostConfig struct { diff --git a/scheduler/config/constants_otel.go b/scheduler/config/constants_otel.go new file mode 100644 index 00000000000..a3196cd77ca --- /dev/null +++ b/scheduler/config/constants_otel.go @@ -0,0 +1,54 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import "go.opentelemetry.io/otel/attribute" + +const ( + AttributePeerRegisterRequest = attribute.Key("d7y.peer.register.request") + AttributeTaskSizeScope = attribute.Key("d7y.task.size.scope") + AttributeSinglePiece = attribute.Key("d7y.peer.single.piece") + AttributePieceReceived = attribute.Key("d7y.peer.piece.received") + AttributeLeavePeerID = attribute.Key("d7y.leave.peer.id") + AttributeLeaveTaskID = attribute.Key("d7y.leave.task.id") + AttributeReportPeerID = attribute.Key("d7y.report.peer.id") + AttributePeerDownloadSuccess = attribute.Key("d7y.peer.download.success") + AttributeDownloadFileURL = attribute.Key("d7y.file.url") + AttributeContentLength = attribute.Key("d7y.source.content.length") + AttributePeerDownloadResult = attribute.Key("d7y.peer.download.result") + AttributeSchedulePacket = attribute.Key("d7y.schedule.packet") + AttributeTaskID = attribute.Key("d7y.peer.task.id") + AttributePeerID = attribute.Key("d7y.peer.id") + AttributeCDNSeedRequest = attribute.Key("d7y.cdn.seed.request") +) + +const ( + SpanPeerRegister = "peer-register" + SpanTriggerCDN = "trigger-cdn" + SpanReportPieceResult = "report-piece-result" + SpanReportPeerResult = "report-peer-result" + SpanPeerLeave = "peer-leave" +) + +const ( + EventScheduleParentFail = "fail-schedule-parent" + EventPeerNotFound = "peer-not-found" + EventHostNotFound = "host-not-found" + EventCreatePeer = "create-peer" + EventPieceReceived = "receive-piece" + EventPeerDownloaded = "downloaded" +) diff --git a/scheduler/core/evaluator/basic/basic_evaluator.go b/scheduler/core/evaluator/basic/basic_evaluator.go index cf9e662620e..7d57a04a493 100644 --- a/scheduler/core/evaluator/basic/basic_evaluator.go +++ b/scheduler/core/evaluator/basic/basic_evaluator.go @@ -17,8 +17,6 @@ package basic import ( - "time" - logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core/evaluator" @@ -44,7 +42,7 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool { logger.Debugf("peer %s need adjust parent because it has not parent and status is %s", peer.PeerID, peer.GetStatus()) return true } - + // TODO Check whether the parent node is in the blacklist if peer.GetParent() != nil && eval.IsBadNode(peer.GetParent()) { logger.Debugf("peer %s need adjust parent because it current parent is bad", peer.PeerID) return true @@ -61,49 +59,27 @@ func (eval *baseEvaluator) NeedAdjustParent(peer *types.Peer) bool { } avgCost, lastCost := getAvgAndLastCost(costHistory, 4) - if avgCost*40 < lastCost { - logger.Debugf("peer %s is bad because recent pieces have taken too long to download", peer.PeerID) - } // TODO adjust policy - result := (avgCost * 20) < lastCost - if result == true { + if (avgCost * 20) < lastCost { logger.Debugf("peer %s need adjust parent because it latest download cost is too time consuming", peer.PeerID) + return true } - return result + return false } func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { - if peer.Host.CDN { - return false - } - if peer.IsBad() { logger.Debugf("peer %s is bad because status is %s", peer.PeerID, peer.GetStatus()) return true } - - if peer.IsBlocking() { - return false - } - - parent := peer.GetParent() - - if parent == nil { - return false - } - if time.Now().After(peer.GetLastAccessTime().Add(5 * time.Second)) { - logger.Debugf("peer %s is bad because have elapsed %s > 5s since the last access", peer.PeerID, time.Now().Sub(peer.GetLastAccessTime())) - return true - } - - costHistory := parent.GetCostHistory() + costHistory := peer.GetCostHistory() if len(costHistory) < 4 { return false } avgCost, lastCost := getAvgAndLastCost(costHistory, 4) - if avgCost*40 < lastCost { + if avgCost*40 < lastCost && !peer.Host.CDN { logger.Debugf("peer %s is bad because recent pieces have taken too long to download avg[%d] last[%d]", peer.PeerID, avgCost, lastCost) return true } @@ -111,13 +87,13 @@ func (eval *baseEvaluator) IsBadNode(peer *types.Peer) bool { return false } -// The bigger the better -func (eval *baseEvaluator) Evaluate(dst *types.Peer, src *types.Peer) float64 { - profits := getProfits(dst, src) +// Evaluate The bigger, the better +func (eval *baseEvaluator) Evaluate(parent *types.Peer, child *types.Peer) float64 { + profits := getProfits(parent, child) - load := getHostLoad(dst.Host) + load := getHostLoad(parent.Host) - dist := getDistance(dst, src) + dist := getDistance(parent, child) return profits * load * dist } @@ -138,7 +114,7 @@ func getAvgAndLastCost(list []int, splitPos int) (avgCost, lastCost int) { // getProfits 0.0~unlimited larger and better func getProfits(dst *types.Peer, src *types.Peer) float64 { - diff := types.GetDiffPieceNum(src, dst) + diff := types.GetDiffPieceNum(dst, src) depth := dst.GetDepth() return float64(int(diff+1)*src.GetWholeTreeNode()) / float64(depth*depth) diff --git a/scheduler/core/evaluator/evaluator.go b/scheduler/core/evaluator/evaluator.go index 87cb0fb392b..75da4550db3 100644 --- a/scheduler/core/evaluator/evaluator.go +++ b/scheduler/core/evaluator/evaluator.go @@ -29,7 +29,7 @@ import ( type Evaluator interface { // Evaluate todo Normalization - Evaluate(dst *types.Peer, src *types.Peer) float64 + Evaluate(parent *types.Peer, child *types.Peer) float64 // NeedAdjustParent determine whether the peer needs a new parent node NeedAdjustParent(peer *types.Peer) bool diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 164a2bfcaae..9a44662cdc5 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -25,9 +25,11 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/base" schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/structure/sortedlist" + "d7y.io/dragonfly/v2/pkg/synclock" "d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/daemon" "d7y.io/dragonfly/v2/scheduler/types" + "go.opentelemetry.io/otel/trace" "k8s.io/client-go/util/workqueue" ) @@ -43,60 +45,48 @@ type state struct { waitScheduleParentPeerQueue workqueue.DelayingInterface } -func newState(sched scheduler.Scheduler, peerManager daemon.PeerMgr, cdnManager daemon.CDNMgr) *state { +func newState(sched scheduler.Scheduler, peerManager daemon.PeerMgr, cdnManager daemon.CDNMgr, wsdq workqueue.DelayingInterface) *state { return &state{ sched: sched, peerManager: peerManager, cdnManager: cdnManager, - waitScheduleParentPeerQueue: workqueue.NewNamedDelayingQueue("wait reSchedule parent"), + waitScheduleParentPeerQueue: wsdq, } } -func (s *state) start() { - for { - v, shutdown := s.waitScheduleParentPeerQueue.Get() - if shutdown { - break - } - peer := v.(*types.Peer) - if peer.IsDone() || peer.IsLeave() { - logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because peer is done or leave, "+ - "peer status is %s, "+ - "isLeave %t", peer.GetStatus(), peer.IsLeave()) - s.waitScheduleParentPeerQueue.Done(v) - continue - } - parent, candidates, hashParent := s.sched.ScheduleParent(peer) - if !hashParent && !peer.Host.CDN { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("waitScheduleParentPeerQueue: there is no available parent, reschedule it in one second") - s.waitScheduleParentPeerQueue.Done(v) - s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) - continue - } - peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) - logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent) - s.waitScheduleParentPeerQueue.Done(v) - } +type reScheduleParentEvent struct { + peer *types.Peer } -func (s *state) stop() { - if !s.waitScheduleParentPeerQueue.ShuttingDown() { - s.waitScheduleParentPeerQueue.ShutDown() - } +var _ event = reScheduleParentEvent{} + +func (e reScheduleParentEvent) apply(s *state) { + reScheduleParent(e.peer, s) +} + +func (e reScheduleParentEvent) hashKey() string { + return e.peer.Task.TaskID } type startReportPieceResultEvent struct { + ctx context.Context peer *types.Peer } var _ event = startReportPieceResultEvent{} func (e startReportPieceResultEvent) apply(s *state) { + span := trace.SpanFromContext(e.ctx) + if e.peer.GetParent() != nil { + span.AddEvent("no parent") + logger.WithTaskAndPeerID(e.peer.Task.TaskID, + e.peer.PeerID).Warnf("startReportPieceResultEvent: no need schedule parent because peer already had parent %s", e.peer.GetParent().PeerID) + return + } parent, candidates, hasParent := s.sched.ScheduleParent(e.peer) + span.AddEvent("parent") if !hasParent { - logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("peerScheduleParentEvent: there is no available parent, reschedule it in one second") + logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("startReportPieceResultEvent: there is no available parent,reschedule it in one second") s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return } @@ -108,6 +98,7 @@ func (e startReportPieceResultEvent) hashKey() string { } type peerDownloadPieceSuccessEvent struct { + ctx context.Context peer *types.Peer pr *schedulerRPC.PieceResult } @@ -115,11 +106,14 @@ type peerDownloadPieceSuccessEvent struct { var _ event = peerDownloadPieceSuccessEvent{} func (e peerDownloadPieceSuccessEvent) apply(s *state) { + span := trace.SpanFromContext(e.ctx) + span.AddEvent("piece success") e.peer.AddPieceInfo(e.pr.FinishedCount, int(e.pr.EndTime-e.pr.BeginTime)) oldParent := e.peer.GetParent() var candidates []*types.Peer parentPeer, ok := s.peerManager.Get(e.pr.DstPid) if !ok || parentPeer.IsLeave() { + e.peer.ReplaceParent(nil) var hasParent bool parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer) if !hasParent { @@ -152,21 +146,28 @@ func (e peerDownloadPieceFailEvent) apply(s *state) { switch e.pr.Code { case dfcodes.PeerTaskNotFound, dfcodes.ClientPieceRequestFail, dfcodes.ClientPieceDownloadFail: // TODO PeerTaskNotFound remove dest peer task, ClientPieceDownloadFail add blank list - handleReplaceParent(e.peer, s) + reScheduleParent(e.peer, s) return case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: go func(task *types.Task) { + // TODO + synclock.Lock(task.TaskID, false) + defer synclock.UnLock(task.TaskID, false) task.SetStatus(types.TaskStatusRunning) - if err := s.cdnManager.StartSeedTask(context.Background(), task); err != nil { + if cdnPeer, err := s.cdnManager.StartSeedTask(context.Background(), task); err != nil { logger.Errorf("start seed task fail: %v", err) task.SetStatus(types.TaskStatusFailed) handleSeedTaskFail(task) - return + } else { + logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) + children := s.sched.ScheduleChildren(cdnPeer) + for _, child := range children { + child.SendSchedulePacket(constructSuccessPeerPacket(child, cdnPeer, nil)) + } } - logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) }(e.peer.Task) default: - handleReplaceParent(e.peer, s) + reScheduleParent(e.peer, s) return } } @@ -174,20 +175,6 @@ func (e peerDownloadPieceFailEvent) hashKey() string { return e.peer.Task.TaskID } -type peerReplaceParentEvent struct { - peer *types.Peer -} - -func (e peerReplaceParentEvent) hashKey() string { - return e.peer.Task.TaskID -} - -func (e peerReplaceParentEvent) apply(s *state) { - handleReplaceParent(e.peer, s) -} - -var _ event = peerReplaceParentEvent{} - type taskSeedFailEvent struct { task *types.Task } @@ -244,7 +231,6 @@ func (e peerDownloadFailEvent) apply(s *state) { child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) - s.peerManager.Delete(e.peer.PeerID) } func (e peerDownloadFailEvent) hashKey() string { @@ -252,13 +238,27 @@ func (e peerDownloadFailEvent) hashKey() string { } type peerLeaveEvent struct { + ctx context.Context peer *types.Peer } var _ event = peerLeaveEvent{} func (e peerLeaveEvent) apply(s *state) { - handlePeerLeave(e.peer, s) + e.peer.MarkLeave() + removePeerFromCurrentTree(e.peer, s) + e.peer.GetChildren().Range(func(key, value interface{}) bool { + child := value.(*types.Peer) + parent, candidates, hasParent := s.sched.ScheduleParent(child) + if !hasParent { + logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent,reschedule it in one second") + s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) + return true + } + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) + return true + }) + s.peerManager.Delete(e.peer.PeerID) } func (e peerLeaveEvent) hashKey() string { @@ -299,27 +299,10 @@ func constructFailPeerPacket(peer *types.Peer, errCode base.Code) *schedulerRPC. } } -func handlePeerLeave(peer *types.Peer, s *state) { - peer.MarkLeave() - removePeerFromCurrentTree(peer, s) - peer.GetChildren().Range(func(key, value interface{}) bool { - child := value.(*types.Peer) - parent, candidates, hasParent := s.sched.ScheduleParent(child) - if !hasParent { - logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent, reschedule it in one second") - s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) - return true - } - child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) - return true - }) - s.peerManager.Delete(peer.PeerID) -} - -func handleReplaceParent(peer *types.Peer, s *state) { +func reScheduleParent(peer *types.Peer, s *state) { parent, candidates, hasParent := s.sched.ScheduleParent(peer) if !hasParent { - logger.Errorf("handleReplaceParent: failed to schedule parent to peer %s", peer.PeerID) + logger.Errorf("handleReplaceParent: failed to schedule parent to peer %s, reschedule it in one second", peer.PeerID) //peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.SchedWithoutParentPeer) s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) return @@ -331,7 +314,7 @@ func handleSeedTaskFail(task *types.Task) { if task.IsFail() { task.ListPeers().Range(func(data sortedlist.Item) bool { peer := data.(*types.Peer) - peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.CdnError)) + peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.SchedNeedBackSource)) return true }) } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index dd23de57473..dd9c74f2465 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -19,6 +19,7 @@ package core import ( "bytes" "fmt" + "sort" "strconv" "strings" "time" @@ -30,19 +31,16 @@ import ( "k8s.io/client-go/util/workqueue" ) -const ( - PeerGoneTimeout = time.Second * 10 - PeerForceGoneTimeout = time.Minute * 2 -) - type monitor struct { downloadMonitorQueue workqueue.DelayingInterface peerManager daemon.PeerMgr - done chan struct{} log *zap.SugaredLogger } -func newMonitor(peerManager daemon.PeerMgr) *monitor { +func newMonitor(openMonitor bool, peerManager daemon.PeerMgr) *monitor { + if !openMonitor { + return nil + } config := zap.NewDevelopmentConfig() logger, _ := config.Build() return &monitor{ @@ -52,47 +50,49 @@ func newMonitor(peerManager daemon.PeerMgr) *monitor { } } -func (m *monitor) start() { +func (m *monitor) start(done <-chan struct{}) { ticker := time.NewTicker(time.Second * 10) for { select { case <-ticker.C: m.log.Info(m.printDebugInfo()) - case <-m.done: + case <-done: return } } } -func (m *monitor) stop() { - close(m.done) -} - func (m *monitor) printDebugInfo() string { - var roots []*types.Peer - - buffer := bytes.NewBuffer([]byte{}) - table := tablewriter.NewWriter(buffer) - table.SetHeader([]string{"PeerID", "URL", "parent node", "status", "start time", "Finished Piece Num", "Finished", "Free Load"}) + var peers, roots []*types.Peer m.peerManager.ListPeers().Range(func(key interface{}, value interface{}) (ok bool) { ok = true peer := value.(*types.Peer) if peer == nil { + m.log.Error("encounter a nil peer") return } - parentNode := "" if peer.GetParent() == nil { roots = append(roots, peer) - } else { - parentNode = peer.GetParent().PeerID } - - table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(), - peer.CreateTime.String(), - strconv.Itoa(int(peer.GetFinishedNum())), - strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) + peers = append(peers, peer) return }) + sort.Slice(peers, func(i, j int) bool { + return peers[i].GetStatus() > peers[j].GetStatus() + }) + buffer := bytes.NewBuffer([]byte{}) + table := tablewriter.NewWriter(buffer) + table.SetHeader([]string{"PeerID", "TaskID", "URL", "Parent", "Status", "start time", "Finished Piece Num", "Finished", "Free Load"}) + + for _, peer := range peers { + parentNode := "" + if peer.GetParent() != nil { + parentNode = peer.GetParent().PeerID + } + table.Append([]string{peer.PeerID, peer.Task.TaskID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(), + peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())), + strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) + } table.Render() var msgs []string @@ -103,15 +103,12 @@ func (m *monitor) printDebugInfo() string { if node == nil { return } - nPath := append(path, fmt.Sprintf("%s(%d)", node.PeerID, node.GetWholeTreeNode())) + nPath := append(path, fmt.Sprintf("%s(%d)(%s)", node.PeerID, node.GetWholeTreeNode(), node.GetStatus())) if len(path) >= 1 { msgs = append(msgs, node.PeerID+" || "+strings.Join(nPath, "-")) } node.GetChildren().Range(func(key, value interface{}) bool { child := (value).(*types.Peer) - if child == nil { - return true - } printTree(child, nPath) return true }) @@ -121,65 +118,6 @@ func (m *monitor) printDebugInfo() string { printTree(root, nil) } - msg := "============\n" + strings.Join(append(msgs, strconv.Itoa(table.NumLines())), "\n") + "\n===============" + msg := "============\n" + strings.Join(append(msgs, "peer count: "+strconv.Itoa(table.NumLines())), "\n") + "\n===============" return msg } - -func (m *monitor) RefreshDownloadMonitor(peer *types.Peer) { - m.log.With("taskID", peer.Task.TaskID, "peerID", peer.PeerID).Debugf("downloadMonitorWorkingLoop refresh ") - if !peer.IsRunning() { - m.downloadMonitorQueue.AddAfter(peer, time.Second*2) - } else if peer.IsBlocking() { - m.downloadMonitorQueue.AddAfter(peer, time.Second*2) - } else { - delay := time.Millisecond * time.Duration(peer.GetCost()*10) - if delay < time.Millisecond*20 { - delay = time.Millisecond * 20 - } - m.downloadMonitorQueue.AddAfter(peer, delay) - } -} - -// downloadMonitorWorkingLoop monitor peers download -func (m *monitor) downloadMonitorWorkingLoop() { - for { - v, shutdown := m.downloadMonitorQueue.Get() - if shutdown { - m.log.Infof("download monitor working loop closed") - break - } - //if m.downloadMonitorCallBack != nil { - peer := v.(*types.Peer) - if peer != nil { - m.log.With("taskID", peer.Task.TaskID, "peerID", peer.PeerID).Debugf("downloadMonitorWorkingLoop status[%d]", peer.GetStatus()) - if peer.IsSuccess() || peer.Host.CDN { - // clear from monitor - } else { - if !peer.IsRunning() { - // peer do not report for a long time, peer gone - if time.Now().After(peer.GetLastAccessTime().Add(PeerGoneTimeout)) { - peer.MarkLeave() - //pt.SendError(dferrors.New(dfcodes.SchedPeerGone, "report time out")) - } - //m.downloadMonitorCallBack(peer) - } else if !peer.IsBlocking() { - //m.downloadMonitorCallBack(peer) - } else { - if time.Now().After(peer.GetLastAccessTime().Add(PeerForceGoneTimeout)) { - peer.MarkLeave() - //pt.SendError(dferrors.New(dfcodes.SchedPeerGone, "report fource time out")) - } - //m.downloadMonitorCallBack(peer) - } - //_, ok := m.Get(pt.GetPeerID()) - //status := pt.Status - //if ok && !pt.Success && status != types.PeerStatusNodeGone && status != types.PeerStatusLeaveNode { - // m.RefreshDownloadMonitor(pt) - //} - } - } - //} - - m.downloadMonitorQueue.Done(v) - } -} diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index a2309bb6831..9b492ccc468 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -47,17 +47,17 @@ func newBasicSchedulerBuilder() scheduler.Builder { var _ scheduler.Builder = (*basicSchedulerBuilder)(nil) func (builder *basicSchedulerBuilder) Build(cfg *config.SchedulerConfig, opts *scheduler.BuildOptions) (scheduler.Scheduler, error) { - logger.Debugf("start create basic scheduler") + logger.Debugf("start create basic scheduler...") evalFactory := evaluator.NewEvaluatorFactory(cfg) evalFactory.Register("default", basic.NewEvaluator(cfg)) evalFactory.RegisterGetEvaluatorFunc(0, func(taskID string) (string, bool) { return "default", true }) - sch := &Scheduler{ + sched := &Scheduler{ evaluator: evalFactory, peerManager: opts.PeerManager, cfg: cfg, } logger.Debugf("create basic scheduler successfully") - return sch, nil + return sched, nil } func (builder *basicSchedulerBuilder) Name() string { @@ -71,9 +71,9 @@ type Scheduler struct { } func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler children flow") + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start schedule children flow") if s.evaluator.IsBadNode(peer) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("peer is badNode") + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("stop schedule children flow because peer is bad node") return } freeUpload := peer.Host.GetFreeUploadLoad() @@ -106,40 +106,39 @@ func (s *Scheduler) ScheduleChildren(peer *types.Peer) (children []*types.Peer) } func (s *Scheduler) ScheduleParent(peer *types.Peer) (*types.Peer, []*types.Peer, bool) { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start scheduler parent flow") - if !s.evaluator.NeedAdjustParent(peer) { - //logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("peer does not need to replace the parent node, peer is %v and current parent is %v", - // peer, peer.GetParent()) - if peer.GetParent() == nil { - return nil, nil, false - } - return peer.GetParent(), []*types.Peer{peer.GetParent()}, true - } + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start schedule parent flow") + //if !s.evaluator.NeedAdjustParent(peer) { + // logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("stop schedule parent flow because peer is not need adjust parent", peer.PeerID) + // if peer.GetParent() == nil { + // return nil, nil, false + // } + // return peer.GetParent(), []*types.Peer{peer.GetParent()}, true + //} candidateParents := s.selectCandidateParents(peer, s.cfg.CandidateParentCount) - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("select num %d parent candidates %v", len(candidateParents), candidateParents) - var value float64 - var primary = peer.GetParent() + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("select num %d candidates parents, current task tree node count %d ", + len(candidateParents), peer.Task.ListPeers().Size()) + if len(candidateParents) == 0 { + return nil, nil, false + } + evalResult := make(map[float64]*types.Peer) + var evalScore []float64 for _, candidate := range candidateParents { - worth := s.evaluator.Evaluate(candidate, peer) - - // scheduler the same parent, worth reduce a half - //if peer.GetParent() != nil && peer.GetParent().PeerID == candidate.PeerID { - // worth = worth / 2.0 - //} - - if worth > value { - value = worth - primary = candidate - } + score := s.evaluator.Evaluate(candidate, peer) + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("evaluate score candidate %s is %f", candidate.PeerID, score) + evalResult[score] = candidate + evalScore = append(evalScore, score) } - if primary != nil { - if primary != peer.GetParent() { - peer.ReplaceParent(primary) - } - return primary, candidateParents, true + sort.Float64s(evalScore) + var parents = make([]*types.Peer, 0, len(candidateParents)) + for i := range evalScore { + parent := evalResult[evalScore[len(evalScore)-i-1]] + parents = append(parents, parent) + } + if parents[0] != peer.GetParent() { + peer.ReplaceParent(parents[0]) } - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID) - return nil, nil, false + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("primary parent %s is selected", parents[0].PeerID) + return parents[0], parents[1:], true } func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list []*types.Peer) { @@ -197,6 +196,7 @@ func (s *Scheduler) selectCandidateChildren(peer *types.Peer, limit int) (list [ func (s *Scheduler) selectCandidateParents(peer *types.Peer, limit int) (list []*types.Peer) { if !peer.Task.CanSchedule() { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++peer %s can not be scheduled because task status", peer.PeerID) return nil } return s.peerManager.PickReverse(peer.Task, limit, func(candidateNode *types.Peer) bool { diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/service.go similarity index 69% rename from scheduler/core/scheduler_service.go rename to scheduler/core/service.go index e10f70f2e69..e1f8b26a4ea 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/service.go @@ -31,6 +31,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/daemon" + "d7y.io/dragonfly/v2/scheduler/daemon/cdn" "d7y.io/dragonfly/v2/scheduler/daemon/cdn/d7y" "d7y.io/dragonfly/v2/scheduler/daemon/cdn/source" "d7y.io/dragonfly/v2/scheduler/daemon/host" @@ -38,6 +39,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/daemon/task" "d7y.io/dragonfly/v2/scheduler/types" "github.com/pkg/errors" + "k8s.io/client-go/util/workqueue" ) type SchedulerService struct { @@ -50,12 +52,14 @@ type SchedulerService struct { // Peer mgr peerManager daemon.PeerMgr - sched scheduler.Scheduler - worker worker - config *config.SchedulerConfig - wg sync.WaitGroup - monitor *monitor - stopOnce sync.Once + sched scheduler.Scheduler + worker worker + config *config.SchedulerConfig + monitor *monitor + startOnce sync.Once + stopOnce sync.Once + done chan struct{} + wg sync.WaitGroup } func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.DynconfigInterface) (*SchedulerService, error) { @@ -65,17 +69,20 @@ func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.Dynconfig } hostManager := host.NewManager() peerManager := peer.NewManager(cfg.GC, hostManager) - cdnManager := source.NewManager() - if !cfg.DisableCDN { - cdnManager, err = d7y.NewManager(dynConfigData.CDNs, peerManager, hostManager) - if err != nil { + var cdnManager daemon.CDNMgr + if cfg.DisableCDN { + if cdnManager, err = source.NewManager(peerManager, hostManager); err != nil { + return nil, errors.Wrap(err, "new back source cdn manager") + } + } else { + if cdnManager, err = d7y.NewManager(dynConfigData.CDNs, peerManager, hostManager); err != nil { return nil, errors.Wrap(err, "new cdn manager") } dynConfig.Register(cdnManager) hostManager.OnNotify(dynConfigData) dynConfig.Register(hostManager) } - taskManager := task.NewManager(cfg.GC) + taskManager := task.NewManager(cfg.GC, peerManager) sched, err := scheduler.Get(cfg.Scheduler).Build(cfg, &scheduler.BuildOptions{ PeerManager: peerManager, }) @@ -84,10 +91,8 @@ func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.Dynconfig } work := newEventLoopGroup(cfg.WorkerNum) - var downloadMonitor *monitor - if cfg.OpenMonitor { - downloadMonitor = newMonitor(peerManager) - } + downloadMonitor := newMonitor(cfg.OpenMonitor, peerManager) + done := make(chan struct{}) return &SchedulerService{ cdnManager: cdnManager, taskManager: taskManager, @@ -97,25 +102,63 @@ func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.Dynconfig monitor: downloadMonitor, sched: sched, config: cfg, + done: done, }, nil } func (s *SchedulerService) Serve() { - go s.worker.start(newState(s.sched, s.peerManager, s.cdnManager)) - if s.monitor != nil { - go s.monitor.start() + s.startOnce.Do(func() { + s.wg.Add(3) + wsdq := workqueue.NewNamedDelayingQueue("wait reSchedule parent") + go s.runWorkerLoop(wsdq) + go s.runReScheduleParentLoop(wsdq) + go s.runMonitor() + logger.Debugf("start scheduler service successfully") + }) +} + +func (s *SchedulerService) runWorkerLoop(wsdq workqueue.DelayingInterface) { + defer s.wg.Done() + s.worker.start(newState(s.sched, s.peerManager, s.cdnManager, wsdq)) +} + +func (s *SchedulerService) runReScheduleParentLoop(wsdq workqueue.DelayingInterface) { + defer s.wg.Done() + for { + select { + case <-s.done: + wsdq.ShutDown() + return + default: + v, shutdown := wsdq.Get() + if shutdown { + break + } + peer := v.(*types.Peer) + wsdq.Done(v) + if peer.IsDone() || peer.IsLeave() { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Debugf("runReScheduleLoop: peer has left from waitScheduleParentPeerQueue because peer is done or leave, peer status is %s, "+ + "isLeave %t", peer.GetStatus(), peer.IsLeave()) + continue + } + s.worker.send(reScheduleParentEvent{peer}) + } } - logger.Debugf("start scheduler service successfully") } +func (s *SchedulerService) runMonitor() { + defer s.wg.Done() + if s.monitor != nil { + s.monitor.start(s.done) + } +} func (s *SchedulerService) Stop() { s.stopOnce.Do(func() { + close(s.done) if s.worker != nil { s.worker.stop() } - if s.monitor != nil { - s.monitor.stop() - } }) } @@ -173,7 +216,7 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task } synclock.UnLock(task.TaskID, true) // do trigger - task.SetLastTriggerTime(time.Now()) + task.UpdateLastTriggerTime(time.Now()) // register cdn peer task // notify peer tasks synclock.Lock(task.TaskID, false) @@ -184,12 +227,9 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task if task.IsFrozen() { task.SetStatus(types.TaskStatusRunning) } - //if s.config.DisableCDN { - // TODO NeedBackSource - //} go func() { - if err := s.cdnManager.StartSeedTask(ctx, task); err != nil { - if !task.IsSuccess() { + if cdnPeer, err := s.cdnManager.StartSeedTask(ctx, task); err != nil { + if errors.Cause(err) != cdn.ErrCDNInvokeFail { task.SetStatus(types.TaskStatusFailed) } logger.Errorf("failed to seed task: %v", err) @@ -197,19 +237,23 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task logger.Error("failed to send taskSeed fail event, eventLoop is shutdown") } } else { + if ok = s.worker.send(peerDownloadSuccessEvent{cdnPeer, nil}); !ok { + logger.Error("failed to send taskSeed fail event, eventLoop is shutdown") + } logger.Debugf("===== successfully obtain seeds from cdn, task: %+v ====", task) } }() return task, nil } -func (s *SchedulerService) HandlePieceResult(peer *types.Peer, pieceResult *schedulerRPC.PieceResult) error { +func (s *SchedulerService) HandlePieceResult(ctx context.Context, peer *types.Peer, pieceResult *schedulerRPC.PieceResult) error { peer.Touch() if pieceResult.PieceNum == common.ZeroOfPiece { - s.worker.send(startReportPieceResultEvent{peer}) + s.worker.send(startReportPieceResultEvent{ctx, peer}) return nil } else if pieceResult.Success { s.worker.send(peerDownloadPieceSuccessEvent{ + ctx: ctx, peer: peer, pr: pieceResult, }) @@ -224,7 +268,7 @@ func (s *SchedulerService) HandlePieceResult(peer *types.Peer, pieceResult *sche return nil } -func (s *SchedulerService) HandlePeerResult(peer *types.Peer, peerResult *schedulerRPC.PeerResult) error { +func (s *SchedulerService) HandlePeerResult(ctx context.Context, peer *types.Peer, peerResult *schedulerRPC.PeerResult) error { peer.Touch() if peerResult.Success { if !s.worker.send(peerDownloadSuccessEvent{peer: peer, peerResult: peerResult}) { @@ -236,9 +280,12 @@ func (s *SchedulerService) HandlePeerResult(peer *types.Peer, peerResult *schedu return nil } -func (s *SchedulerService) HandleLeaveTask(peer *types.Peer) error { +func (s *SchedulerService) HandleLeaveTask(ctx context.Context, peer *types.Peer) error { peer.Touch() - if !s.worker.send(peerLeaveEvent{peer: peer}) { + if !s.worker.send(peerLeaveEvent{ + ctx: ctx, + peer: peer, + }) { logger.Errorf("send peer leave event failed") } return nil diff --git a/scheduler/core/worker.go b/scheduler/core/worker.go index 2c14a16280a..9680abba205 100644 --- a/scheduler/core/worker.go +++ b/scheduler/core/worker.go @@ -32,7 +32,6 @@ type workerGroup struct { workerNum int workerList []*baseWorker stopCh chan struct{} - s *state } var _ worker = (*workerGroup)(nil) @@ -46,14 +45,11 @@ func newEventLoopGroup(workerNum int) worker { } func (wg *workerGroup) start(s *state) { - wg.s = s - go s.start() for i := 0; i < wg.workerNum; i++ { w := newWorker() go w.start(s) wg.workerList = append(wg.workerList, w) } - logger.Infof("start scheduler worker number:%d", wg.workerNum) } @@ -64,7 +60,6 @@ func (wg *workerGroup) send(e event) bool { func (wg *workerGroup) stop() { close(wg.stopCh) - wg.s.stop() for _, worker := range wg.workerList { worker.stop() } diff --git a/scheduler/daemon/cdn/d7y/manager.go b/scheduler/daemon/cdn/d7y/manager.go index 5bedc2f860a..ea55d14b961 100644 --- a/scheduler/daemon/cdn/d7y/manager.go +++ b/scheduler/daemon/cdn/d7y/manager.go @@ -22,6 +22,7 @@ import ( "io" "io/ioutil" "net/http" + "reflect" "sync" "d7y.io/dragonfly/v2/internal/dfcodes" @@ -32,21 +33,21 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/daemon" + "d7y.io/dragonfly/v2/scheduler/daemon/cdn" "d7y.io/dragonfly/v2/scheduler/types" "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) -var ErrCDNRegisterFail = errors.New("cdn task register failed") +var tracer trace.Tracer -var ErrCDNDownloadFail = errors.New("cdn task download failed") - -var ErrCDNUnknown = errors.New("cdn obtain seed encounter unknown err") - -var ErrCDNInvokeFail = errors.New("invoke cdn interface failed") - -var ErrInitCDNPeerFail = errors.New("init cdn peer failed") +func init() { + tracer = otel.Tracer("scheduler-cdn") +} type manager struct { + cdnAddrs []dfnet.NetAddr client client.CdnClient peerManager daemon.PeerMgr hostManager daemon.HostMgr @@ -55,11 +56,13 @@ type manager struct { func NewManager(cdnServers []*config.CDN, peerManager daemon.PeerMgr, hostManager daemon.HostMgr) (daemon.CDNMgr, error) { // Initialize CDNManager client - cdnClient, err := client.GetClientByAddr(cdnHostsToNetAddrs(cdnServers)) + cdnAddrs := cdnHostsToNetAddrs(cdnServers) + cdnClient, err := client.GetClientByAddr(cdnAddrs) if err != nil { return nil, errors.Wrapf(err, "create cdn client for scheduler") } mgr := &manager{ + cdnAddrs: cdnAddrs, client: cdnClient, peerManager: peerManager, hostManager: hostManager, @@ -80,72 +83,92 @@ func cdnHostsToNetAddrs(hosts []*config.CDN) []dfnet.NetAddr { } func (cm *manager) OnNotify(c *config.DynconfigData) { + netAddrs := cdnHostsToNetAddrs(c.CDNs) + if reflect.DeepEqual(netAddrs, c.CDNs) { + return + } cm.lock.Lock() defer cm.lock.Unlock() // Sync CDNManager client netAddrs - cm.client.UpdateState(cdnHostsToNetAddrs(c.CDNs)) + cm.cdnAddrs = netAddrs + cm.client.UpdateState(netAddrs) } -func (cm *manager) StartSeedTask(ctx context.Context, task *types.Task) error { - if cm.client == nil { - return ErrCDNRegisterFail - } - // TODO 这个地方必须重新生成一个ctx,不能使用传递进来的参数,需要排查下原因 - stream, err := cm.client.ObtainSeeds(context.Background(), &cdnsystem.SeedRequest{ +func (cm *manager) StartSeedTask(ctx context.Context, task *types.Task) (*types.Peer, error) { + var seedSpan trace.Span + ctx, seedSpan = tracer.Start(ctx, config.SpanTriggerCDN, trace.WithSpanKind(trace.SpanKindClient)) + defer seedSpan.End() + seedRequest := &cdnsystem.SeedRequest{ TaskId: task.TaskID, Url: task.URL, UrlMeta: task.URLMeta, - }) + } + seedSpan.SetAttributes(config.AttributeCDNSeedRequest.String(seedRequest.String())) + if cm.client == nil { + err := cdn.ErrCDNRegisterFail + seedSpan.RecordError(err) + seedSpan.SetAttributes(config.AttributePeerDownloadSuccess.Bool(false)) + return nil, err + } + stream, err := cm.client.ObtainSeeds(context.Background(), seedRequest) if err != nil { + seedSpan.RecordError(err) + seedSpan.SetAttributes(config.AttributePeerDownloadSuccess.Bool(false)) if cdnErr, ok := err.(*dferrors.DfError); ok { logger.Errorf("failed to obtain cdn seed: %v", cdnErr) switch cdnErr.Code { case dfcodes.CdnTaskRegistryFail: - return errors.Wrap(ErrCDNRegisterFail, "obtain seeds") + return nil, errors.Wrap(cdn.ErrCDNRegisterFail, "obtain seeds") case dfcodes.CdnTaskDownloadFail: - return errors.Wrapf(ErrCDNDownloadFail, "obtain seeds") + return nil, errors.Wrapf(cdn.ErrCDNDownloadFail, "obtain seeds") default: - return errors.Wrapf(ErrCDNUnknown, "obtain seeds") + return nil, errors.Wrapf(cdn.ErrCDNUnknown, "obtain seeds") } } - return errors.Wrapf(ErrCDNInvokeFail, "obtain seeds from cdn: %v", err) + return nil, errors.Wrapf(cdn.ErrCDNInvokeFail, "obtain seeds from cdn: %v", err) } - return cm.receivePiece(task, stream) + return cm.receivePiece(ctx, task, stream) } -func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream) error { +func (cm *manager) receivePiece(ctx context.Context, task *types.Task, stream *client.PieceSeedStream) (*types.Peer, error) { + span := trace.SpanFromContext(ctx) var initialized bool var cdnPeer *types.Peer for { piece, err := stream.Recv() if err == io.EOF { if task.GetStatus() == types.TaskStatusSuccess { - return nil + span.SetAttributes(config.AttributePeerDownloadSuccess.Bool(true)) + return cdnPeer, nil } - return errors.Errorf("cdn stream receive EOF but task status is %s", task.GetStatus()) + return cdnPeer, errors.Errorf("cdn stream receive EOF but task status is %s", task.GetStatus()) } if err != nil { + span.RecordError(err) + span.SetAttributes(config.AttributePeerDownloadSuccess.Bool(false)) if recvErr, ok := err.(*dferrors.DfError); ok { + span.RecordError(recvErr) switch recvErr.Code { case dfcodes.CdnTaskRegistryFail: - return errors.Wrapf(ErrCDNRegisterFail, "receive piece") + return cdnPeer, errors.Wrapf(cdn.ErrCDNRegisterFail, "receive piece") case dfcodes.CdnTaskDownloadFail: - return errors.Wrapf(ErrCDNDownloadFail, "receive piece") + return cdnPeer, errors.Wrapf(cdn.ErrCDNDownloadFail, "receive piece") default: - return errors.Wrapf(ErrCDNUnknown, "recive piece") + return cdnPeer, errors.Wrapf(cdn.ErrCDNUnknown, "recive piece") } } - return errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err) + return cdnPeer, errors.Wrapf(cdn.ErrCDNInvokeFail, "receive piece from cdn: %v", err) } if piece != nil { + span.AddEvent(config.EventPieceReceived, trace.WithAttributes(config.AttributePieceReceived.String(piece.String()))) if !initialized { - cdnPeer, err = cm.initCdnPeer(task, piece) + cdnPeer, err = cm.initCdnPeer(ctx, task, piece) + task.SetStatus(types.TaskStatusSeeding) initialized = true } if err != nil || cdnPeer == nil { - return err + return cdnPeer, err } - task.SetStatus(types.TaskStatusSeeding) cdnPeer.Touch() if piece.Done { task.PieceTotal = piece.TotalPieceCount @@ -153,12 +176,14 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream task.SetStatus(types.TaskStatusSuccess) cdnPeer.SetStatus(types.PeerStatusSuccess) if task.ContentLength <= types.TinyFileSize { - content, er := cm.DownloadTinyFileContent(task, cdnPeer.Host) + content, er := cm.DownloadTinyFileContent(ctx, task, cdnPeer.Host) if er == nil && len(content) == int(task.ContentLength) { task.DirectPiece = content } } - return nil + span.SetAttributes(config.AttributePeerDownloadSuccess.Bool(true)) + span.SetAttributes(config.AttributeContentLength.Int64(task.ContentLength)) + return cdnPeer, nil } cdnPeer.AddPieceInfo(piece.PieceInfo.PieceNum+1, 0) task.AddPiece(&types.PieceInfo{ @@ -173,7 +198,9 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream } } -func (cm *manager) initCdnPeer(task *types.Task, ps *cdnsystem.PieceSeed) (*types.Peer, error) { +func (cm *manager) initCdnPeer(ctx context.Context, task *types.Task, ps *cdnsystem.PieceSeed) (*types.Peer, error) { + span := trace.SpanFromContext(ctx) + span.AddEvent(config.EventCreatePeer) var ok bool var cdnHost *types.PeerHost cdnPeer, ok := cm.peerManager.Get(ps.PeerId) @@ -181,7 +208,7 @@ func (cm *manager) initCdnPeer(task *types.Task, ps *cdnsystem.PieceSeed) (*type logger.Debugf("first seed cdn task for taskID %s", task.TaskID) if cdnHost, ok = cm.hostManager.Get(ps.HostUuid); !ok { logger.Errorf("cannot find host %s", ps.HostUuid) - return nil, errors.Wrapf(ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid) + return nil, errors.Wrapf(cdn.ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid) } cdnPeer = types.NewPeer(ps.PeerId, task, cdnHost) } @@ -190,12 +217,14 @@ func (cm *manager) initCdnPeer(task *types.Task, ps *cdnsystem.PieceSeed) (*type return cdnPeer, nil } -func (cm *manager) DownloadTinyFileContent(task *types.Task, cdnHost *types.PeerHost) ([]byte, error) { +func (cm *manager) DownloadTinyFileContent(ctx context.Context, task *types.Task, cdnHost *types.PeerHost) ([]byte, error) { + span := trace.SpanFromContext(ctx) // no need to invoke getPieceTasks method // TODO download the tiny file // http://host:port/download/{taskId 前3位}/{taskId}?peerId={peerId}; url := fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler", cdnHost.IP, cdnHost.DownloadPort, task.TaskID[:3], task.TaskID) + span.SetAttributes(config.AttributeDownloadFileURL.String(url)) response, err := http.Get(url) if err != nil { return nil, err diff --git a/scheduler/daemon/cdn/error.go b/scheduler/daemon/cdn/error.go index d92e1224047..a06e454e678 100644 --- a/scheduler/daemon/cdn/error.go +++ b/scheduler/daemon/cdn/error.go @@ -15,3 +15,17 @@ */ package cdn + +import "github.com/pkg/errors" + +var ( + ErrCDNRegisterFail = errors.New("cdn task register failed") + + ErrCDNDownloadFail = errors.New("cdn task download failed") + + ErrCDNUnknown = errors.New("cdn obtain seed encounter unknown err") + + ErrCDNInvokeFail = errors.New("invoke cdn interface failed") + + ErrInitCDNPeerFail = errors.New("init cdn peer failed") +) diff --git a/scheduler/daemon/cdn/source/manager.go b/scheduler/daemon/cdn/source/manager.go index 1e6375b7fd0..a91ddc5bf13 100644 --- a/scheduler/daemon/cdn/source/manager.go +++ b/scheduler/daemon/cdn/source/manager.go @@ -25,18 +25,47 @@ import ( ) type manager struct { + peerManager daemon.PeerMgr + hostManager daemon.HostMgr } -func (m manager) OnNotify(dynconfigData *config.DynconfigData) { - panic("implement me") +func NewManager(peerManager daemon.PeerMgr, hostManager daemon.HostMgr) (daemon.CDNMgr, error) { + mgr := &manager{ + peerManager: peerManager, + hostManager: hostManager, + } + return mgr, nil } -func (m manager) StartSeedTask(ctx context.Context, task *types.Task) error { +func (m manager) OnNotify(data *config.DynconfigData) { panic("implement me") } -func NewManager() daemon.CDNMgr { - return &manager{} +func (m manager) StartSeedTask(ctx context.Context, task *types.Task) (*types.Peer, error) { + //stream, err := cm.client.ObtainSeeds(context.Background(), &cdnsystem.SeedRequest{ + // TaskId: task.TaskID, + // Url: task.URL, + // UrlMeta: task.URLMeta, + //}) + //if err != nil { + // if cdnErr, ok := err.(*dferrors.DfError); ok { + // logger.Errorf("failed to obtain cdn seed: %v", cdnErr) + // switch cdnErr.Code { + // case dfcodes.CdnTaskRegistryFail: + // return errors.Wrap(cdn.ErrCDNRegisterFail, "obtain seeds") + // case dfcodes.CdnTaskDownloadFail: + // return errors.Wrapf(cdn.ErrCDNDownloadFail, "obtain seeds") + // default: + // return errors.Wrapf(cdn.ErrCDNUnknown, "obtain seeds") + // } + // } + // return errors.Wrapf(cdn.ErrCDNInvokeFail, "obtain seeds from cdn: %v", err) + //} + //return cm.receivePiece(task, stream) + //source.GetContentLength(context.Background(), task.URL, nil) + //task.ListPeers() + //task.SetStatus(types.TaskStatusSuccess) + return nil, nil } var _ daemon.CDNMgr = (*manager)(nil) diff --git a/scheduler/daemon/cdn_mgr.go b/scheduler/daemon/cdn_mgr.go index f517bbda0ad..dfec2582b07 100644 --- a/scheduler/daemon/cdn_mgr.go +++ b/scheduler/daemon/cdn_mgr.go @@ -25,6 +25,6 @@ import ( type CDNMgr interface { config.Observer - // StartSeedTask - StartSeedTask(ctx context.Context, task *types.Task) error + // StartSeedTask start seed cdn task + StartSeedTask(ctx context.Context, task *types.Task) (*types.Peer, error) } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index d450f486b29..d97b805de24 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -31,7 +31,21 @@ type manager struct { hostManager daemon.HostMgr cleanupExpiredPeerTicker *time.Ticker peerTTL time.Duration + peerTTI time.Duration peerMap sync.Map + lock sync.RWMutex +} + +func (m *manager) ListPeersByTask(taskID string) []*types.Peer { + var peers []*types.Peer + m.peerMap.Range(func(key, value interface{}) bool { + peer := value.(*types.Peer) + if peer.Task.TaskID == taskID { + peers = append(peers, peer) + } + return true + }) + return peers } func (m *manager) ListPeers() *sync.Map { @@ -43,6 +57,7 @@ func NewManager(cfg *config.GCConfig, hostManager daemon.HostMgr) daemon.PeerMgr hostManager: hostManager, cleanupExpiredPeerTicker: time.NewTicker(cfg.PeerGCInterval), peerTTL: cfg.PeerTTL, + peerTTI: cfg.PeerTTI, } go m.cleanupPeers() return m @@ -51,6 +66,8 @@ func NewManager(cfg *config.GCConfig, hostManager daemon.HostMgr) daemon.PeerMgr var _ daemon.PeerMgr = (*manager)(nil) func (m *manager) Add(peer *types.Peer) { + m.lock.Lock() + defer m.lock.Unlock() peer.Host.AddPeer(peer) peer.Task.AddPeer(peer) m.peerMap.Store(peer.PeerID, peer) @@ -66,27 +83,19 @@ func (m *manager) Get(peerID string) (*types.Peer, bool) { } func (m *manager) Delete(peerID string) { + m.lock.Lock() + defer m.lock.Unlock() peer, ok := m.Get(peerID) if ok { peer.Host.DeletePeer(peerID) peer.Task.DeletePeer(peer) peer.UnBindSendChannel() + peer.ReplaceParent(nil) m.peerMap.Delete(peerID) } return } -func (m *manager) ListPeersByTask(taskID string) (peers []*types.Peer) { - m.peerMap.Range(func(key, value interface{}) bool { - peer := value.(*types.Peer) - if peer.Task.TaskID == taskID { - peers = append(peers, peer) - } - return true - }) - return -} - func (m *manager) Pick(task *types.Task, limit int, pickFn func(peer *types.Peer) bool) (pickedPeers []*types.Peer) { return m.pick(task, limit, false, pickFn) } @@ -129,12 +138,16 @@ func (m *manager) cleanupPeers() { for range m.cleanupExpiredPeerTicker.C { m.peerMap.Range(func(key, value interface{}) bool { peer := value.(*types.Peer) - if time.Now().Sub(peer.GetLastAccessTime()) > m.peerTTL && !peer.IsDone() { - logger.Debugf("peer %s has been more than %s since last access, set status to zombie", peer.PeerID, m.peerTTL) + elapse := time.Since(peer.GetLastAccessTime()) + if elapse > m.peerTTI && !peer.IsDone() { + if !peer.IsBindSendChannel() { + peer.MarkLeave() + } + logger.Debugf("peer %s has been more than %s since last access, set status to zombie", peer.PeerID, m.peerTTI) peer.SetStatus(types.PeerStatusZombie) } - if peer.IsLeave() { - logger.Debugf("delete peer %s", peer.PeerID) + if peer.IsLeave() || peer.IsFail() || elapse > m.peerTTL { + logger.Debugf("delete peer %s because %s have passed since last access", peer.PeerID) m.Delete(key.(string)) if !peer.Host.CDN && peer.Host.GetPeerTaskNum() == 0 { m.hostManager.Delete(peer.Host.UUID) diff --git a/scheduler/daemon/peer_mgr.go b/scheduler/daemon/peer_mgr.go index d22ca27080b..9f9d621446b 100644 --- a/scheduler/daemon/peer_mgr.go +++ b/scheduler/daemon/peer_mgr.go @@ -29,10 +29,10 @@ type PeerMgr interface { Delete(peerID string) - ListPeers() *sync.Map - ListPeersByTask(taskID string) []*types.Peer + ListPeers() *sync.Map + Pick(task *types.Task, limit int, pickFn func(pt *types.Peer) bool) []*types.Peer PickReverse(task *types.Task, limit int, pickFn func(peer *types.Peer) bool) []*types.Peer diff --git a/scheduler/daemon/task/manager.go b/scheduler/daemon/task/manager.go index 129c5c3c735..fe93b52d7d5 100644 --- a/scheduler/daemon/task/manager.go +++ b/scheduler/daemon/task/manager.go @@ -26,15 +26,19 @@ import ( ) type manager struct { + peerManager daemon.PeerMgr cleanupExpiredTaskTicker *time.Ticker taskTTL time.Duration + taskTTI time.Duration taskMap sync.Map } -func NewManager(cfg *config.GCConfig) daemon.TaskMgr { +func NewManager(cfg *config.GCConfig, peerManager daemon.PeerMgr) daemon.TaskMgr { m := &manager{ + peerManager: peerManager, cleanupExpiredTaskTicker: time.NewTicker(cfg.TaskGCInterval), taskTTL: cfg.TaskTTL, + taskTTI: cfg.TaskTTI, } go m.cleanupTasks() return m @@ -70,8 +74,18 @@ func (m *manager) cleanupTasks() { for range m.cleanupExpiredTaskTicker.C { m.taskMap.Range(func(key, value interface{}) bool { task := value.(*types.Task) - if time.Now().Sub(task.GetLastAccessTime()) > m.taskTTL { - m.Delete(key.(string)) + elapse := time.Since(task.GetLastAccessTime()) + if elapse > m.taskTTI && task.IsSuccess() { + task.SetStatus(types.TaskStatusZombie) + } + if elapse > m.taskTTL { + taskID := key.(string) + // TODO lock + m.Delete(taskID) + peers := m.peerManager.ListPeersByTask(taskID) + for _, peer := range peers { + m.peerManager.Delete(peer.PeerID) + } } return true }) diff --git a/scheduler/daemon/task_mgr.go b/scheduler/daemon/task_mgr.go index c7b0e52e003..057029fb3f2 100644 --- a/scheduler/daemon/task_mgr.go +++ b/scheduler/daemon/task_mgr.go @@ -21,13 +21,10 @@ import ( ) type TaskMgr interface { - // Add Add(task *types.Task) - // Get Get(taskID string) (task *types.Task, ok bool) - // Delete Delete(taskID string) GetOrAdd(task *types.Task) (actual *types.Task, loaded bool) diff --git a/scheduler/server/server.go b/scheduler/server.go similarity index 91% rename from scheduler/server/server.go rename to scheduler/server.go index fb4c43bebf8..62bd257f655 100644 --- a/scheduler/server/server.go +++ b/scheduler/server.go @@ -14,13 +14,15 @@ * limitations under the License. */ -package server +package scheduler import ( "context" "time" + server2 "d7y.io/dragonfly/v2/scheduler/server" "d7y.io/dragonfly/v2/scheduler/tasks" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "d7y.io/dragonfly/v2/cmd/dependency" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -29,7 +31,6 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/manager" "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/scheduler/core" - "d7y.io/dragonfly/v2/scheduler/server/service" "github.com/pkg/errors" "google.golang.org/grpc" @@ -100,7 +101,7 @@ func New(cfg *config.Config) (*Server, error) { } s.schedulerService = schedulerService // Initialize scheduler service - s.schedulerServer, err = service.NewSchedulerServer(schedulerService) + s.schedulerServer, err = server2.NewSchedulerServer(schedulerService) if err != nil { return nil, err @@ -144,7 +145,11 @@ func (s *Server) Serve() error { } logger.Infof("start server at port %d", port) - if err := rpc.StartTCPServer(port, port, s.schedulerServer); err != nil { + var opts []grpc.ServerOption + if s.config.Options.Telemetry.Jaeger != "" { + opts = append(opts, grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor())) + } + if err := rpc.StartTCPServer(port, port, s.schedulerServer, opts...); err != nil { return err } return nil diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/scheduler_server.go similarity index 64% rename from scheduler/server/service/scheduler_server.go rename to scheduler/server/scheduler_server.go index 9aa0c0eb881..041d30e85cd 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/scheduler_server.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package service +package server import ( "context" @@ -26,17 +26,27 @@ import ( "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/base/common" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/pkg/util/net/urlutils" "d7y.io/dragonfly/v2/pkg/util/stringutils" + "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core" "d7y.io/dragonfly/v2/scheduler/types" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +var tracer trace.Tracer + +func init() { + tracer = otel.Tracer("scheduler-server") +} + type SchedulerServer struct { service *core.SchedulerService } @@ -50,26 +60,34 @@ func NewSchedulerServer(service *core.SchedulerService) (server.SchedulerServer, func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTaskRequest) (resp *scheduler.RegisterResult, err error) { defer func() { - logger.Debugf("peer %s register result %v", request.PeerId, resp) + logger.Debugf("peer %s register result %v, err: %v", request.PeerId, resp, err) }() + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanPeerRegister, trace.WithSpanKind(trace.SpanKindServer)) + span.SetAttributes(config.AttributePeerRegisterRequest.String(request.String())) + defer span.End() logger.Debugf("register peer task, req: %+v", request) resp = new(scheduler.RegisterResult) if verifyErr := validateParams(request); verifyErr != nil { err = dferrors.Newf(dfcodes.BadRequest, "bad request param: %v", verifyErr) logger.Errorf("validate register request failed: %v", err) + span.RecordError(err) return } taskID := s.service.GenerateTaskID(request.Url, request.UrlMeta, request.PeerId) - task := types.NewTask(taskID, request.Url, request.UrlMeta.Filter, request.UrlMeta) + span.SetAttributes(config.AttributeTaskID.String(taskID)) + task := types.NewTask(taskID, request.Url, request.UrlMeta) task, err = s.service.GetOrCreateTask(ctx, task) if err != nil { err = dferrors.Newf(dfcodes.SchedCDNSeedFail, "create task failed: %v", err) logger.Errorf("get or create task failed: %v", err) + span.RecordError(err) return } if task.IsFail() { err = dferrors.Newf(dfcodes.SchedTaskStatusError, "task status is %s", task.GetStatus()) logger.Errorf("task status is unhealthy, task status is: %s", task.GetStatus()) + span.RecordError(err) return } resp.SizeScope = getTaskSizeScope(task) @@ -79,50 +97,62 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul resp.DirectPiece = &scheduler.RegisterResult_PieceContent{ PieceContent: task.DirectPiece, } + span.SetAttributes(config.AttributeTaskSizeScope.String(resp.SizeScope.String())) return case base.SizeScope_SMALL: + span.SetAttributes(config.AttributeTaskSizeScope.String(resp.SizeScope.String())) peer, regErr := s.service.RegisterPeerTask(request, task) if regErr != nil { err = dferrors.Newf(dfcodes.SchedPeerRegisterFail, "failed to register peer: %v", regErr) + span.RecordError(err) return } parent, schErr := s.service.ScheduleParent(peer) if schErr != nil { + span.SetAttributes(config.AttributeTaskSizeScope.String(base.SizeScope_NORMAL.String())) + span.AddEvent(config.EventScheduleParentFail) resp.SizeScope = base.SizeScope_NORMAL resp.TaskId = taskID //err = dferrors.Newf(dfcodes.SchedPeerScheduleFail, "failed to schedule peer %v: %v", peer.PeerID, schErr) return } - singlePiece := task.GetPiece(0) - resp.DirectPiece = &scheduler.RegisterResult_SinglePiece{ - SinglePiece: &scheduler.SinglePiece{ - DstPid: parent.PeerID, - DstAddr: fmt.Sprintf("%s:%d", parent.Host.IP, parent.Host.DownloadPort), - PieceInfo: &base.PieceInfo{ - PieceNum: singlePiece.PieceNum, - RangeStart: singlePiece.RangeStart, - RangeSize: singlePiece.RangeSize, - PieceMd5: singlePiece.PieceMd5, - PieceOffset: singlePiece.PieceOffset, - PieceStyle: singlePiece.PieceStyle, - }, + firstPiece := task.GetPiece(0) + singlePiece := &scheduler.SinglePiece{ + DstPid: parent.PeerID, + DstAddr: fmt.Sprintf("%s:%d", parent.Host.IP, parent.Host.DownloadPort), + PieceInfo: &base.PieceInfo{ + PieceNum: firstPiece.PieceNum, + RangeStart: firstPiece.RangeStart, + RangeSize: firstPiece.RangeSize, + PieceMd5: firstPiece.PieceMd5, + PieceOffset: firstPiece.PieceOffset, + PieceStyle: firstPiece.PieceStyle, }, } + resp.DirectPiece = &scheduler.RegisterResult_SinglePiece{ + SinglePiece: singlePiece, + } + span.SetAttributes(config.AttributeSinglePiece.String(singlePiece.String())) return default: + span.SetAttributes(config.AttributeTaskSizeScope.String(resp.SizeScope.String())) _, regErr := s.service.RegisterPeerTask(request, task) if regErr != nil { err = dferrors.Newf(dfcodes.SchedPeerRegisterFail, "failed to register peer: %v", regErr) + span.RecordError(regErr) } return } } func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { + var span trace.Span + ctx, span := tracer.Start(stream.Context(), config.SpanReportPieceResult, trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() peerPacketChan := make(chan *scheduler.PeerPacket, 1) var peer *types.Peer - var initialized bool - ctx, cancel := context.WithCancel(stream.Context()) + initialized := false + ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) var once sync.Once g.Go(func() error { @@ -137,10 +167,12 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie default: pieceResult, err := stream.Recv() if err == io.EOF { + span.AddEvent("report piece process exited because client has terminated sending the request") return nil } if err != nil { if status.Code(err) == codes.Canceled { + span.AddEvent("report piece process exited because an error exception was received") if peer != nil { logger.Info("peer %s canceled", peer.PeerID) return nil @@ -158,8 +190,13 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie peer.BindSendChannel(peerPacketChan) peer.SetStatus(types.PeerStatusRunning) initialized = true + span.SetAttributes(config.AttributePeerID.String(peer.PeerID)) + span.AddEvent("init") + } + if pieceResult.PieceNum == common.EndOfPiece { + return nil } - if err := s.service.HandlePieceResult(peer, pieceResult); err != nil { + if err := s.service.HandlePieceResult(ctx, peer, pieceResult); err != nil { logger.Errorf("handle piece result %v fail: %v", pieceResult, err) } } @@ -177,8 +214,10 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie return nil case pp, ok := <-peerPacketChan: if !ok { + span.AddEvent("exit report piece process due to send channel has closed") return nil } + span.AddEvent("schedule event", trace.WithAttributes(config.AttributeSchedulePacket.String(pp.String()))) err := stream.Send(pp) if err != nil { logger.Errorf("send peer %s schedule packet %v failed: %v", pp.SrcPid, pp, err) @@ -194,22 +233,36 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie func (s *SchedulerServer) ReportPeerResult(ctx context.Context, result *scheduler.PeerResult) (err error) { logger.Debugf("report peer result %+v", result) + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanReportPeerResult, trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + span.SetAttributes(config.AttributeReportPeerID.String(result.PeerId)) + span.SetAttributes(config.AttributePeerDownloadSuccess.Bool(result.Success)) + span.SetAttributes(config.AttributePeerDownloadResult.String(result.String())) peer, ok := s.service.GetPeerTask(result.PeerId) if !ok { logger.Warnf("report peer result: peer %s is not exists", result.PeerId) - return dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", result.PeerId) + err = dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", result.PeerId) + span.RecordError(err) } - return s.service.HandlePeerResult(peer, result) + return s.service.HandlePeerResult(ctx, peer, result) } func (s *SchedulerServer) LeaveTask(ctx context.Context, target *scheduler.PeerTarget) (err error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanPeerLeave, trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + span.SetAttributes(config.AttributeLeavePeerID.String(target.PeerId)) + span.SetAttributes(config.AttributeLeaveTaskID.String(target.TaskId)) logger.Debugf("leave task %+v", target) peer, ok := s.service.GetPeerTask(target.PeerId) if !ok { logger.Warnf("leave task: peer %s is not exists", target.PeerId) - return dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", target.PeerId) + err = dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", target.PeerId) + span.RecordError(err) + return } - return s.service.HandleLeaveTask(peer) + return s.service.HandleLeaveTask(ctx, peer) } // validateParams validates the params of scheduler.PeerTaskRequest. diff --git a/scheduler/tasks/tasks.go b/scheduler/tasks/tasks.go index 27e2ae761fc..f5e29c67633 100644 --- a/scheduler/tasks/tasks.go +++ b/scheduler/tasks/tasks.go @@ -158,7 +158,7 @@ func (t *tasks) preheat(req string) error { taskID := idgen.TaskID(request.URL, meta) logger.Debugf("ready to preheat \"%s\", taskID = %s", request.URL, taskID) - task := types.NewTask(taskID, request.URL, request.Filter, meta) + task := types.NewTask(taskID, request.URL, meta) task, err := t.service.GetOrCreateTask(t.ctx, task) if err != nil { return dferrors.Newf(dfcodes.SchedCDNSeedFail, "create task failed: %v", err) diff --git a/scheduler/types/host.go b/scheduler/types/host.go index c49f865a47e..00f10bc3a80 100644 --- a/scheduler/types/host.go +++ b/scheduler/types/host.go @@ -18,6 +18,8 @@ package types import ( "sync" + + "go.uber.org/atomic" ) type PeerHost struct { @@ -43,23 +45,23 @@ type PeerHost struct { // NetTopology network device path: switch|router|... NetTopology string // TODO TotalUploadLoad currentUploadLoad decided by real time client report host info - TotalUploadLoad int - currentUploadLoad int + TotalUploadLoad int32 + currentUploadLoad atomic.Int32 peerMap map[string]*Peer } func NewClientPeerHost(uuid, ip, hostname string, rpcPort, downloadPort int32, securityDomain, location, idc, netTopology string, - totalUploadLoad int) *PeerHost { + totalUploadLoad int32) *PeerHost { return newPeerHost(uuid, ip, hostname, rpcPort, downloadPort, false, securityDomain, location, idc, netTopology, totalUploadLoad) } func NewCDNPeerHost(uuid, ip, hostname string, rpcPort, downloadPort int32, securityDomain, location, idc, netTopology string, - totalUploadLoad int) *PeerHost { + totalUploadLoad int32) *PeerHost { return newPeerHost(uuid, ip, hostname, rpcPort, downloadPort, true, securityDomain, location, idc, netTopology, totalUploadLoad) } func newPeerHost(uuid, ip, hostname string, rpcPort, downloadPort int32, isCDN bool, securityDomain, location, idc, netTopology string, - totalUploadLoad int) *PeerHost { + totalUploadLoad int32) *PeerHost { return &PeerHost{ UUID: uuid, IP: ip, @@ -101,37 +103,25 @@ func (h *PeerHost) GetPeer(peerID string) (*Peer, bool) { return peer, ok } -func (h *PeerHost) GetCurrentUpload() int { - h.lock.RLock() - defer h.lock.RUnlock() - return h.currentUploadLoad +func (h *PeerHost) GetCurrentUpload() int32 { + return h.currentUploadLoad.Load() } func (h *PeerHost) GetUploadLoadPercent() float64 { - h.lock.RLock() - defer h.lock.RUnlock() if h.TotalUploadLoad <= 0 { return 1.0 } - return float64(h.currentUploadLoad) / float64(h.TotalUploadLoad) + return float64(h.currentUploadLoad.Load()) / float64(h.TotalUploadLoad) } func (h *PeerHost) GetFreeUploadLoad() int { - h.lock.RLock() - defer h.lock.RUnlock() - return h.TotalUploadLoad - h.currentUploadLoad + return int(h.TotalUploadLoad - h.currentUploadLoad.Load()) } -func (h *PeerHost) IncUploadLoad() int { - h.lock.Lock() - defer h.lock.Unlock() - h.currentUploadLoad++ - return h.currentUploadLoad +func (h *PeerHost) IncUploadLoad() int32 { + return h.currentUploadLoad.Inc() } -func (h *PeerHost) DecUploadLoad() int { - h.lock.Lock() - defer h.lock.Unlock() - h.currentUploadLoad-- - return h.currentUploadLoad +func (h *PeerHost) DecUploadLoad() int32 { + return h.currentUploadLoad.Dec() } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 28ce9712b3a..c1d8d9a3e4b 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -20,6 +20,7 @@ import ( "sync" "time" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "go.uber.org/atomic" ) @@ -35,9 +36,9 @@ func (status PeerStatus) String() string { case PeerStatusSuccess: return "Success" case PeerStatusFail: - return "fail" + return "Fail" case PeerStatusZombie: - return "zombie" + return "Zombie" default: return "unknown" } @@ -86,10 +87,8 @@ func NewPeer(peerID string, task *Task, host *PeerHost) *Peer { } } +// TODO: remove func (peer *Peer) GetWholeTreeNode() int { - // TODO lock task - peer.lock.RLock() - defer peer.lock.RUnlock() count := 1 peer.children.Range(func(key, value interface{}) bool { peerNode := value.(*Peer) @@ -116,18 +115,14 @@ func (peer *Peer) Touch() { } func (peer *Peer) associateChild(child *Peer) { - peer.lock.Lock() peer.children.Store(child.PeerID, child) - peer.Host.IncUploadLoad() - peer.lock.Unlock() + logger.Errorf("%s:%s", peer.Host.HostName, peer.Host.IncUploadLoad()) peer.Task.peers.Update(peer) } func (peer *Peer) disassociateChild(child *Peer) { - peer.lock.Lock() peer.children.Delete(child.PeerID) peer.Host.DecUploadLoad() - peer.lock.Unlock() peer.Task.peers.Update(peer) } @@ -202,19 +197,19 @@ func (peer *Peer) GetTreeRoot() *Peer { return node } -// if peer is offspring of ancestor +// IsDescendantOf if peer is offspring of ancestor func (peer *Peer) IsDescendantOf(ancestor *Peer) bool { - if ancestor == nil { + return isDescendant(ancestor, peer) +} + +func isDescendant(ancestor, offspring *Peer) bool { + if ancestor == nil || offspring == nil { return false } // TODO avoid circulation - peer.lock.RLock() - ancestor.lock.RLock() - defer ancestor.lock.RUnlock() - defer peer.lock.RUnlock() - node := peer + node := offspring for node != nil { - if node.parent == nil || node.Host.CDN { + if node.parent == nil { return false } else if node.PeerID == ancestor.PeerID { return true @@ -224,36 +219,14 @@ func (peer *Peer) IsDescendantOf(ancestor *Peer) bool { return false } +// IsAncestorOf if offspring is offspring of peer func (peer *Peer) IsAncestorOf(offspring *Peer) bool { - if offspring == nil { - return false - } - offspring.lock.RLock() - peer.lock.RLock() - defer peer.lock.RUnlock() - defer offspring.lock.RUnlock() - node := offspring - for node != nil { - if node.parent == nil || node.Host.CDN { - return false - } else if node.PeerID == peer.PeerID { - return true - } - node = node.parent - } - return false + return isDescendant(peer, offspring) } -func (peer *Peer) IsBlocking() bool { +func (peer *Peer) GetSortKeys() (key1, key2 int) { peer.lock.RLock() defer peer.lock.RUnlock() - if peer.parent == nil { - return false - } - return peer.finishedNum.Load() >= peer.parent.finishedNum.Load() -} - -func (peer *Peer) GetSortKeys() (key1, key2 int) { key1 = int(peer.finishedNum.Load()) key2 = peer.getFreeLoad() return @@ -266,12 +239,8 @@ func (peer *Peer) getFreeLoad() int { return peer.Host.GetFreeUploadLoad() } -func GetDiffPieceNum(src *Peer, dst *Peer) int32 { - diff := src.finishedNum.Load() - dst.finishedNum.Load() - if diff > 0 { - return diff - } - return -diff +func GetDiffPieceNum(dst *Peer, src *Peer) int32 { + return dst.finishedNum.Load() - src.finishedNum.Load() } func (peer *Peer) GetParent() *Peer { @@ -286,12 +255,6 @@ func (peer *Peer) GetChildren() *sync.Map { return &peer.children } -func (peer *Peer) SetStatus(status PeerStatus) { - peer.lock.Lock() - defer peer.lock.Unlock() - peer.status = status -} - func (peer *Peer) BindSendChannel(packetChan chan *scheduler.PeerPacket) { peer.lock.Lock() defer peer.lock.Unlock() @@ -310,6 +273,12 @@ func (peer *Peer) UnBindSendChannel() { } } +func (peer *Peer) IsBindSendChannel() bool { + peer.lock.RLock() + defer peer.lock.RUnlock() + return peer.bindPacketChan +} + func (peer *Peer) SendSchedulePacket(packet *scheduler.PeerPacket) { peer.lock.Lock() defer peer.lock.Unlock() @@ -318,6 +287,30 @@ func (peer *Peer) SendSchedulePacket(packet *scheduler.PeerPacket) { } } +func (peer *Peer) SetStatus(status PeerStatus) { + peer.lock.Lock() + defer peer.lock.Unlock() + peer.status = status +} + +func (peer *Peer) GetStatus() PeerStatus { + peer.lock.RLock() + defer peer.lock.RUnlock() + return peer.status +} + +func (peer *Peer) GetFinishedNum() int32 { + return peer.finishedNum.Load() +} + +func (peer *Peer) MarkLeave() { + peer.leave.Store(true) +} + +func (peer *Peer) IsLeave() bool { + return peer.leave.Load() +} + func (peer *Peer) IsRunning() bool { return peer.status == PeerStatusRunning } @@ -338,20 +331,6 @@ func (peer *Peer) IsBad() bool { return peer.status == PeerStatusFail || peer.status == PeerStatusZombie } -func (peer *Peer) GetFinishedNum() int32 { - return peer.finishedNum.Load() -} - -func (peer *Peer) GetStatus() PeerStatus { - peer.lock.RLock() - defer peer.lock.RUnlock() - return peer.status -} - -func (peer *Peer) MarkLeave() { - peer.leave.Store(true) -} - -func (peer *Peer) IsLeave() bool { - return peer.leave.Load() +func (peer *Peer) IsFail() bool { + return peer.status == PeerStatusFail } diff --git a/scheduler/types/task.go b/scheduler/types/task.go index 9ee7a2b729f..ba2aa460e5d 100644 --- a/scheduler/types/task.go +++ b/scheduler/types/task.go @@ -32,6 +32,8 @@ func (status TaskStatus) String() string { return "Waiting" case TaskStatusRunning: return "Running" + case TaskStatusZombie: + return "Zombie" case TaskStatusSuccess: return "Success" case TaskStatusCDNRegisterFail: @@ -48,6 +50,7 @@ func (status TaskStatus) String() string { const ( TaskStatusWaiting TaskStatus = iota TaskStatusRunning + TaskStatusZombie TaskStatusSeeding TaskStatusSuccess TaskStatusCDNRegisterFail @@ -56,15 +59,14 @@ const ( ) type Task struct { - lock sync.RWMutex TaskID string URL string - Filter string URLMeta *base.UrlMeta DirectPiece []byte CreateTime time.Time lastAccessTime time.Time lastTriggerTime time.Time + lock sync.RWMutex pieceList map[int32]*PieceInfo PieceTotal int32 ContentLength int64 @@ -73,11 +75,10 @@ type Task struct { // TODO add cdnPeers } -func NewTask(taskID, url, filter string, meta *base.UrlMeta) *Task { +func NewTask(taskID, url string, meta *base.UrlMeta) *Task { return &Task{ TaskID: taskID, URL: url, - Filter: filter, URLMeta: meta, pieceList: make(map[int32]*PieceInfo), peers: sortedlist.NewSortedList(), @@ -85,6 +86,18 @@ func NewTask(taskID, url, filter string, meta *base.UrlMeta) *Task { } } +func (task *Task) AddPeer(peer *Peer) { + task.peers.UpdateOrAdd(peer) +} + +func (task *Task) DeletePeer(peer *Peer) { + task.peers.Delete(peer) +} + +func (task *Task) ListPeers() *sortedlist.SortedList { + return task.peers +} + func (task *Task) SetStatus(status TaskStatus) { task.lock.Lock() defer task.lock.Unlock() @@ -103,16 +116,6 @@ func (task *Task) GetPiece(pieceNum int32) *PieceInfo { return task.pieceList[pieceNum] } -func (task *Task) AddPeer(peer *Peer) { - task.peers.UpdateOrAdd(peer) -} - -func (task *Task) DeletePeer(peer *Peer) { - //task.lock.Lock() - //defer task.lock.Unlock() - task.peers.Delete(peer) -} - func (task *Task) AddPiece(p *PieceInfo) { task.lock.Lock() defer task.lock.Unlock() @@ -120,8 +123,6 @@ func (task *Task) AddPiece(p *PieceInfo) { } func (task *Task) GetLastTriggerTime() time.Time { - task.lock.RLock() - defer task.lock.RUnlock() return task.lastTriggerTime } @@ -131,9 +132,7 @@ func (task *Task) Touch() { task.lastAccessTime = time.Now() } -func (task *Task) SetLastTriggerTime(lastTriggerTime time.Time) { - task.lock.Lock() - defer task.lock.Unlock() +func (task *Task) UpdateLastTriggerTime(lastTriggerTime time.Time) { task.lastTriggerTime = lastTriggerTime } @@ -143,8 +142,20 @@ func (task *Task) GetLastAccessTime() time.Time { return task.lastAccessTime } -func (task *Task) ListPeers() *sortedlist.SortedList { - return task.peers +func (task *Task) Lock() { + task.lock.Lock() +} + +func (task *Task) UnLock() { + task.lock.Unlock() +} + +func (task *Task) RLock() { + task.lock.RLock() +} + +func (task *Task) RUnlock() { + task.lock.RUnlock() } const TinyFileSize = 128 @@ -158,28 +169,34 @@ type PieceInfo struct { PieceStyle base.PieceStyle } -// isSuccessCDN determines that whether the CDNStatus is success. +// IsSuccess determines that whether cdn status is success. func (task *Task) IsSuccess() bool { return task.status == TaskStatusSuccess } +// IsFrozen determines that whether cdn status is frozen func (task *Task) IsFrozen() bool { return task.status == TaskStatusFailed || task.status == TaskStatusWaiting || task.status == TaskStatusSourceError || task.status == TaskStatusCDNRegisterFail } +// CanSchedule determines whether task can be scheduled +// only task status is seeding or success can be scheduled func (task *Task) CanSchedule() bool { return task.status == TaskStatusSeeding || task.status == TaskStatusSuccess } +// IsWaiting determines whether task is waiting func (task *Task) IsWaiting() bool { return task.status == TaskStatusWaiting } +// IsHealth determines whether task is health func (task *Task) IsHealth() bool { - return task.status == TaskStatusRunning || task.status == TaskStatusSuccess + return task.status == TaskStatusRunning || task.status == TaskStatusSuccess || task.status == TaskStatusSeeding } +// IsFail determines whether task is fail func (task *Task) IsFail() bool { return task.status == TaskStatusFailed || task.status == TaskStatusSourceError || task.status == TaskStatusCDNRegisterFail }