Skip to content

Commit

Permalink
feat: avoid hot resolve in grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Nov 15, 2023
1 parent 4d4ea8d commit 4a48421
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 14 deletions.
9 changes: 9 additions & 0 deletions client/config/dynconfig_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"reflect"
"sync"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -53,6 +54,7 @@ type dynconfigManager struct {
cachePath string
transportCredentials credentials.TransportCredentials
schedulerClusterID uint64
mu sync.Mutex
}

// newDynconfigManager returns a new manager dynconfig instence.
Expand All @@ -74,6 +76,7 @@ func newDynconfigManager(cfg *DaemonOption, rawManagerClient managerclient.V1, c
cachePath: cachePath,
Dynconfig: d,
transportCredentials: creds,
mu: sync.Mutex{},
}, nil
}

Expand Down Expand Up @@ -201,6 +204,12 @@ func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error)

// Refresh refreshes dynconfig in cache.
func (d *dynconfigManager) Refresh() error {
// If another load is in progress, return directly.
if !d.mu.TryLock() {
return nil
}
defer d.mu.Unlock()

if err := d.Dynconfig.Refresh(); err != nil {
return err
}
Expand Down
15 changes: 7 additions & 8 deletions internal/dynconfig/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,7 @@ func New[T any](client ManagerClient, cachePath string, expire time.Duration) (D

// Refresh refreshes dynconfig in cache.
func (d *dynconfig[T]) Refresh() error {
// Avoid hot reload.
if d.mu.TryLock() {
defer d.mu.Unlock()
return d.load()
}

// If reload is in progress, return nil.
return nil
return d.load()
}

// Get dynamic config.
Expand Down Expand Up @@ -115,6 +108,12 @@ func (d *dynconfig[T]) Get() (*T, error) {

// Load dynamic config from manager.
func (d *dynconfig[T]) load() error {
// If another load is in progress, return directly.
if !d.mu.TryLock() {
return errors.New("another load is in progress")
}
defer d.mu.Unlock()

rawData, err := d.client.Get()
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion pkg/resolver/scheduler_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package resolver

import (
"reflect"
"sync"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
Expand All @@ -42,11 +43,12 @@ type SchedulerResolver struct {
addrs []resolver.Address
cc resolver.ClientConn
dynconfig config.Dynconfig
mu *sync.Mutex
}

// RegisterScheduler registers the dragonfly resolver builder to the grpc with custom schema.
func RegisterScheduler(dynconfig config.Dynconfig) {
resolver.Register(&SchedulerResolver{dynconfig: dynconfig})
resolver.Register(&SchedulerResolver{dynconfig: dynconfig, mu: &sync.Mutex{}})
}

// Scheme returns the resolver scheme.
Expand All @@ -70,6 +72,12 @@ func (r *SchedulerResolver) Build(target resolver.Target, cc resolver.ClientConn
// to refresh addresses from manager when all SubConn fail.
// So here we don't trigger resolving to reduce the pressure of manager.
func (r *SchedulerResolver) ResolveNow(resolver.ResolveNowOptions) {
// Avoid concurrent GetResolveSchedulerAddrs calls.
if !r.mu.TryLock() {
return
}
defer r.mu.Unlock()

addrs, err := r.dynconfig.GetResolveSchedulerAddrs()
if err != nil {
plogger.Errorf("resolve addresses error %v", err)
Expand Down
10 changes: 9 additions & 1 deletion pkg/resolver/seed_peer_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package resolver
import (
"fmt"
"reflect"
"sync"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
Expand All @@ -43,11 +44,12 @@ type SeedPeerResolver struct {
addrs []resolver.Address
cc resolver.ClientConn
dynconfig config.DynconfigInterface
mu *sync.Mutex
}

// RegisterSeedPeer register the dragonfly resovler builder to the grpc with custom schema.
func RegisterSeedPeer(dynconfig config.DynconfigInterface) {
resolver.Register(&SeedPeerResolver{dynconfig: dynconfig})
resolver.Register(&SeedPeerResolver{dynconfig: dynconfig, mu: &sync.Mutex{}})
}

// Scheme returns the resolver scheme.
Expand All @@ -71,6 +73,12 @@ func (r *SeedPeerResolver) Build(target resolver.Target, cc resolver.ClientConn,
// to refresh addresses from manager when all SubConn fail.
// So here we don't trigger resolving to reduce the pressure of manager.
func (r *SeedPeerResolver) ResolveNow(resolver.ResolveNowOptions) {
// Avoid concurrent GetResolveSeedPeerAddrs calls.
if !r.mu.TryLock() {
return
}
defer r.mu.Unlock()

addrs, err := r.dynconfig.GetResolveSeedPeerAddrs()
if err != nil {
slogger.Errorf("resolve addresses error %v", err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/rpc/health/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli

// Check checks health of grpc server.
func Check(ctx context.Context, target string, opts ...grpc.DialOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

healthClient, err := GetClient(ctx, target, opts...)
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type ManagerConfig struct {
type SeedPeerConfig struct {
// Enable is to enable seed peer as P2P peer.
Enable bool `yaml:"enable" mapstructure:"enable"`

// TaskDownloadTimeout is timeout of downloading task by seed peer.
TaskDownloadTimeout time.Duration `yaml:"taskDownloadTimeout" mapstructure:"taskDownloadTimeout"`
}

type KeepAliveConfig struct {
Expand Down Expand Up @@ -416,7 +419,8 @@ func New() *Config {
},
},
SeedPeer: SeedPeerConfig{
Enable: true,
Enable: true,
TaskDownloadTimeout: DefaultSeedPeerTaskDownloadTimeout,
},
Job: JobConfig{
Enable: true,
Expand Down Expand Up @@ -570,6 +574,10 @@ func (cfg *Config) Validate() error {
return errors.New("manager requires parameter keepAlive interval")
}

if cfg.SeedPeer.TaskDownloadTimeout <= 0 {
return errors.New("seedPeer requires parameter taskDownloadTimeout")
}

if cfg.Job.Enable {
if cfg.Job.GlobalWorkerNum == 0 {
return errors.New("job requires parameter globalWorkerNum")
Expand Down
17 changes: 16 additions & 1 deletion scheduler/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func TestConfig_Load(t *testing.T) {
},
},
SeedPeer: SeedPeerConfig{
Enable: true,
Enable: true,
TaskDownloadTimeout: 12 * time.Hour,
},
Host: HostConfig{
IDC: "foo",
Expand Down Expand Up @@ -560,6 +561,20 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "manager requires parameter keepAlive interval")
},
},
{
name: "seedPeer requires parameter taskDownloadTimeout",
config: New(),
mock: func(cfg *Config) {
cfg.Manager = mockManagerConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job = mockJobConfig
cfg.SeedPeer.TaskDownloadTimeout = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "seedPeer requires parameter taskDownloadTimeout")
},
},
{
name: "job requires parameter globalWorkerNum",
config: New(),
Expand Down
5 changes: 5 additions & 0 deletions scheduler/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ const (
DefaultManagerKeepAliveInterval = 5 * time.Second
)

const (
// DefaultSeedTaskDownloadTimeout is default timeout of downloading task by seed peer.
DefaultSeedPeerTaskDownloadTimeout = 10 * time.Hour
)

const (
// DefaultJobGlobalWorkerNum is default global worker number for job.
DefaultJobGlobalWorkerNum = 500
Expand Down
9 changes: 9 additions & 0 deletions scheduler/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net"
"os"
"path/filepath"
"sync"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -114,6 +115,7 @@ type dynconfig struct {
done chan struct{}
cachePath string
transportCredentials credentials.TransportCredentials
mu *sync.Mutex
}

// DynconfigOption is a functional option for configuring the dynconfig.
Expand All @@ -135,6 +137,7 @@ func NewDynconfig(rawManagerClient managerclient.V2, cacheDir string, cfg *Confi
observers: map[Observer]struct{}{},
done: make(chan struct{}),
cachePath: cachePath,
mu: &sync.Mutex{},
}

for _, opt := range options {
Expand Down Expand Up @@ -314,6 +317,12 @@ func (d *dynconfig) GetSchedulerClusterClientConfig() (types.SchedulerClusterCli

// Refresh refreshes dynconfig in cache.
func (d *dynconfig) Refresh() error {
// If another load is in progress, return directly.
if !d.mu.TryLock() {
return nil
}
defer d.mu.Unlock()

if err := d.Dynconfig.Refresh(); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/config/testdata/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ manager:

seedPeer:
enable: true
taskDownloadTimeout: 12h

job:
enable: true
Expand Down
2 changes: 1 addition & 1 deletion scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest,

// triggerSeedPeerTask starts to trigger seed peer task.
func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *resource.Task) {
ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
ctx, cancel := context.WithTimeout(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)), v.config.SeedPeer.TaskDownloadTimeout)
defer cancel()

task.Log.Info("trigger seed peer")
Expand Down
7 changes: 6 additions & 1 deletion scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ var (
BackToSourceCount: int(mockTaskBackToSourceLimit),
}

mockSeedPeerConfig = config.SeedPeerConfig{
Enable: true,
TaskDownloadTimeout: 1 * time.Hour,
}

mockNetworkTopologyConfig = config.NetworkTopologyConfig{
Enable: true,
CollectInterval: 2 * time.Hour,
Expand Down Expand Up @@ -3850,7 +3855,7 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockResourceConfig, task, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig, storage, networkTopology)

tc.mock(task, peer, seedPeer, res.EXPECT(), seedPeer.EXPECT())
svc.triggerSeedPeerTask(context.Background(), &mockPeerRange, task)
Expand Down

0 comments on commit 4a48421

Please sign in to comment.