From 9608eff3bf91e24f3afea5417bf6a3522c56ec5d Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 15 Nov 2023 22:23:43 +0800 Subject: [PATCH] feat: avoid hot resolve in grpc (#2884) Signed-off-by: Gaius --- client/config/dynconfig_manager.go | 9 +++++++++ internal/dynconfig/dynconfig.go | 15 +++++++-------- pkg/resolver/scheduler_resolver.go | 10 +++++++++- pkg/resolver/seed_peer_resolver.go | 10 +++++++++- pkg/rpc/health/client/client.go | 3 +++ scheduler/config/config.go | 10 +++++++++- scheduler/config/config_test.go | 17 ++++++++++++++++- scheduler/config/constants.go | 5 +++++ scheduler/config/dynconfig.go | 9 +++++++++ scheduler/config/testdata/scheduler.yaml | 1 + scheduler/service/service_v1.go | 2 +- scheduler/service/service_v1_test.go | 7 ++++++- 12 files changed, 84 insertions(+), 14 deletions(-) diff --git a/client/config/dynconfig_manager.go b/client/config/dynconfig_manager.go index f30c22c7ba7..f88217b5dfd 100644 --- a/client/config/dynconfig_manager.go +++ b/client/config/dynconfig_manager.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "reflect" + "sync" "time" "google.golang.org/grpc" @@ -53,6 +54,7 @@ type dynconfigManager struct { cachePath string transportCredentials credentials.TransportCredentials schedulerClusterID uint64 + mu sync.Mutex } // newDynconfigManager returns a new manager dynconfig instence. @@ -74,6 +76,7 @@ func newDynconfigManager(cfg *DaemonOption, rawManagerClient managerclient.V1, c cachePath: cachePath, Dynconfig: d, transportCredentials: creds, + mu: sync.Mutex{}, }, nil } @@ -201,6 +204,12 @@ func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error) // Refresh refreshes dynconfig in cache. func (d *dynconfigManager) Refresh() error { + // If another load is in progress, return directly. + if !d.mu.TryLock() { + return nil + } + defer d.mu.Unlock() + if err := d.Dynconfig.Refresh(); err != nil { return err } diff --git a/internal/dynconfig/dynconfig.go b/internal/dynconfig/dynconfig.go index 45498228067..84c28a3f134 100644 --- a/internal/dynconfig/dynconfig.go +++ b/internal/dynconfig/dynconfig.go @@ -79,14 +79,7 @@ func New[T any](client ManagerClient, cachePath string, expire time.Duration) (D // Refresh refreshes dynconfig in cache. func (d *dynconfig[T]) Refresh() error { - // Avoid hot reload. - if d.mu.TryLock() { - defer d.mu.Unlock() - return d.load() - } - - // If reload is in progress, return nil. - return nil + return d.load() } // Get dynamic config. @@ -115,6 +108,12 @@ func (d *dynconfig[T]) Get() (*T, error) { // Load dynamic config from manager. func (d *dynconfig[T]) load() error { + // If another load is in progress, return directly. + if !d.mu.TryLock() { + return errors.New("another load is in progress") + } + defer d.mu.Unlock() + rawData, err := d.client.Get() if err != nil { return err diff --git a/pkg/resolver/scheduler_resolver.go b/pkg/resolver/scheduler_resolver.go index de4f92cb811..4d5e9260ff1 100644 --- a/pkg/resolver/scheduler_resolver.go +++ b/pkg/resolver/scheduler_resolver.go @@ -18,6 +18,7 @@ package resolver import ( "reflect" + "sync" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" @@ -42,11 +43,12 @@ type SchedulerResolver struct { addrs []resolver.Address cc resolver.ClientConn dynconfig config.Dynconfig + mu *sync.Mutex } // RegisterScheduler registers the dragonfly resolver builder to the grpc with custom schema. func RegisterScheduler(dynconfig config.Dynconfig) { - resolver.Register(&SchedulerResolver{dynconfig: dynconfig}) + resolver.Register(&SchedulerResolver{dynconfig: dynconfig, mu: &sync.Mutex{}}) } // Scheme returns the resolver scheme. @@ -70,6 +72,12 @@ func (r *SchedulerResolver) Build(target resolver.Target, cc resolver.ClientConn // to refresh addresses from manager when all SubConn fail. // So here we don't trigger resolving to reduce the pressure of manager. func (r *SchedulerResolver) ResolveNow(resolver.ResolveNowOptions) { + // Avoid concurrent GetResolveSchedulerAddrs calls. + if !r.mu.TryLock() { + return + } + defer r.mu.Unlock() + addrs, err := r.dynconfig.GetResolveSchedulerAddrs() if err != nil { plogger.Errorf("resolve addresses error %v", err) diff --git a/pkg/resolver/seed_peer_resolver.go b/pkg/resolver/seed_peer_resolver.go index 9914c3b51b0..63ea4801e4a 100644 --- a/pkg/resolver/seed_peer_resolver.go +++ b/pkg/resolver/seed_peer_resolver.go @@ -19,6 +19,7 @@ package resolver import ( "fmt" "reflect" + "sync" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" @@ -43,11 +44,12 @@ type SeedPeerResolver struct { addrs []resolver.Address cc resolver.ClientConn dynconfig config.DynconfigInterface + mu *sync.Mutex } // RegisterSeedPeer register the dragonfly resovler builder to the grpc with custom schema. func RegisterSeedPeer(dynconfig config.DynconfigInterface) { - resolver.Register(&SeedPeerResolver{dynconfig: dynconfig}) + resolver.Register(&SeedPeerResolver{dynconfig: dynconfig, mu: &sync.Mutex{}}) } // Scheme returns the resolver scheme. @@ -71,6 +73,12 @@ func (r *SeedPeerResolver) Build(target resolver.Target, cc resolver.ClientConn, // to refresh addresses from manager when all SubConn fail. // So here we don't trigger resolving to reduce the pressure of manager. func (r *SeedPeerResolver) ResolveNow(resolver.ResolveNowOptions) { + // Avoid concurrent GetResolveSeedPeerAddrs calls. + if !r.mu.TryLock() { + return + } + defer r.mu.Unlock() + addrs, err := r.dynconfig.GetResolveSeedPeerAddrs() if err != nil { slogger.Errorf("resolve addresses error %v", err) diff --git a/pkg/rpc/health/client/client.go b/pkg/rpc/health/client/client.go index 5a53b68f142..7434701f37a 100644 --- a/pkg/rpc/health/client/client.go +++ b/pkg/rpc/health/client/client.go @@ -68,6 +68,9 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli // Check checks health of grpc server. func Check(ctx context.Context, target string, opts ...grpc.DialOption) error { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + healthClient, err := GetClient(ctx, target, opts...) if err != nil { return err diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 1d55276ad78..1641fe1ef98 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -217,6 +217,9 @@ type ManagerConfig struct { type SeedPeerConfig struct { // Enable is to enable seed peer as P2P peer. Enable bool `yaml:"enable" mapstructure:"enable"` + + // TaskDownloadTimeout is timeout of downloading task by seed peer. + TaskDownloadTimeout time.Duration `yaml:"taskDownloadTimeout" mapstructure:"taskDownloadTimeout"` } type KeepAliveConfig struct { @@ -416,7 +419,8 @@ func New() *Config { }, }, SeedPeer: SeedPeerConfig{ - Enable: true, + Enable: true, + TaskDownloadTimeout: DefaultSeedPeerTaskDownloadTimeout, }, Job: JobConfig{ Enable: true, @@ -570,6 +574,10 @@ func (cfg *Config) Validate() error { return errors.New("manager requires parameter keepAlive interval") } + if cfg.SeedPeer.TaskDownloadTimeout <= 0 { + return errors.New("seedPeer requires parameter taskDownloadTimeout") + } + if cfg.Job.Enable { if cfg.Job.GlobalWorkerNum == 0 { return errors.New("job requires parameter globalWorkerNum") diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index 2cd1dcdfbbd..87044bc3e8c 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -136,7 +136,8 @@ func TestConfig_Load(t *testing.T) { }, }, SeedPeer: SeedPeerConfig{ - Enable: true, + Enable: true, + TaskDownloadTimeout: 12 * time.Hour, }, Host: HostConfig{ IDC: "foo", @@ -560,6 +561,20 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "manager requires parameter keepAlive interval") }, }, + { + name: "seedPeer requires parameter taskDownloadTimeout", + config: New(), + mock: func(cfg *Config) { + cfg.Manager = mockManagerConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job = mockJobConfig + cfg.SeedPeer.TaskDownloadTimeout = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "seedPeer requires parameter taskDownloadTimeout") + }, + }, { name: "job requires parameter globalWorkerNum", config: New(), diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index 5d3f22fcd10..7bf47a5425f 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -124,6 +124,11 @@ const ( DefaultManagerKeepAliveInterval = 5 * time.Second ) +const ( + // DefaultSeedTaskDownloadTimeout is default timeout of downloading task by seed peer. + DefaultSeedPeerTaskDownloadTimeout = 10 * time.Hour +) + const ( // DefaultJobGlobalWorkerNum is default global worker number for job. DefaultJobGlobalWorkerNum = 500 diff --git a/scheduler/config/dynconfig.go b/scheduler/config/dynconfig.go index 8162ea0a6fd..7be8c8b34a4 100644 --- a/scheduler/config/dynconfig.go +++ b/scheduler/config/dynconfig.go @@ -26,6 +26,7 @@ import ( "net" "os" "path/filepath" + "sync" "time" "google.golang.org/grpc" @@ -114,6 +115,7 @@ type dynconfig struct { done chan struct{} cachePath string transportCredentials credentials.TransportCredentials + mu *sync.Mutex } // DynconfigOption is a functional option for configuring the dynconfig. @@ -135,6 +137,7 @@ func NewDynconfig(rawManagerClient managerclient.V2, cacheDir string, cfg *Confi observers: map[Observer]struct{}{}, done: make(chan struct{}), cachePath: cachePath, + mu: &sync.Mutex{}, } for _, opt := range options { @@ -314,6 +317,12 @@ func (d *dynconfig) GetSchedulerClusterClientConfig() (types.SchedulerClusterCli // Refresh refreshes dynconfig in cache. func (d *dynconfig) Refresh() error { + // If another load is in progress, return directly. + if !d.mu.TryLock() { + return nil + } + defer d.mu.Unlock() + if err := d.Dynconfig.Refresh(); err != nil { return err } diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index 3344c3d2b56..d18f2c026d4 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -59,6 +59,7 @@ manager: seedPeer: enable: true + taskDownloadTimeout: 12h job: enable: true diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 53d7ae548b4..264badfe3dc 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -840,7 +840,7 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, // triggerSeedPeerTask starts to trigger seed peer task. func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *resource.Task) { - ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))) + ctx, cancel := context.WithTimeout(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)), v.config.SeedPeer.TaskDownloadTimeout) defer cancel() task.Log.Info("trigger seed peer") diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index ce5be18ca5a..7005ce99015 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -74,6 +74,11 @@ var ( BackToSourceCount: int(mockTaskBackToSourceLimit), } + mockSeedPeerConfig = config.SeedPeerConfig{ + Enable: true, + TaskDownloadTimeout: 1 * time.Hour, + } + mockNetworkTopologyConfig = config.NetworkTopologyConfig{ Enable: true, CollectInterval: 2 * time.Hour, @@ -3850,7 +3855,7 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockResourceConfig, task, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(task, peer, seedPeer, res.EXPECT(), seedPeer.EXPECT()) svc.triggerSeedPeerTask(context.Background(), &mockPeerRange, task)