Skip to content

Commit

Permalink
feat: support mutli manager addrs (#846)
Browse files Browse the repository at this point in the history
* feat: support mutli manager addrs (#844)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 1, 2021
1 parent ee68e97 commit 4542d21
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 21 deletions.
6 changes: 3 additions & 3 deletions client/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ type dynconfig struct {
done chan bool
}

func NewDynconfig(managerClient internaldynconfig.ManagerClient, expire time.Duration) (Dynconfig, error) {
func NewDynconfig(rawManagerClient managerclient.Client, hostOption HostOption, expire time.Duration) (Dynconfig, error) {
client, err := internaldynconfig.New(
internaldynconfig.ManagerSourceType,
internaldynconfig.WithManagerClient(managerClient),
internaldynconfig.WithManagerClient(newManagerClient(rawManagerClient, hostOption)),
internaldynconfig.WithExpireTime(expire),
internaldynconfig.WithCachePath(cachePath),
)
Expand Down Expand Up @@ -172,7 +172,7 @@ type managerClient struct {
}

// New the manager client used by dynconfig
func NewManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient {
func newManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient {
return &managerClient{
Client: client,
hostOption: hostOption,
Expand Down
6 changes: 3 additions & 3 deletions client/config/dynconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestDynconfigNewDynconfig(t *testing.T) {

mockManagerClient := mocks.NewMockClient(ctl)
tc.mock(mockManagerClient.EXPECT())
_, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire)
_, err := NewDynconfig(mockManagerClient, tc.hostOption, tc.expire)
tc.expect(t, err)
tc.cleanFileCache(t)
})
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestDynconfigGet(t *testing.T) {

mockManagerClient := mocks.NewMockClient(ctl)
tc.mock(mockManagerClient.EXPECT(), tc.data)
dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire)
dynconfig, err := NewDynconfig(mockManagerClient, tc.hostOption, tc.expire)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestDynconfigGetSchedulers(t *testing.T) {

mockManagerClient := mocks.NewMockClient(ctl)
tc.mock(mockManagerClient.EXPECT(), tc.data)
dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire)
dynconfig, err := NewDynconfig(mockManagerClient, tc.hostOption, tc.expire)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *DaemonOption) Validate() error {
}

if p.Scheduler.Manager.Enable {
if p.Scheduler.Manager.Addr == "" {
if len(p.Scheduler.NetAddrs) == 0 {
return errors.New("manager addr is not specified")
}

Expand All @@ -140,8 +140,8 @@ type SchedulerOption struct {
type ManagerOption struct {
// Enable get configuration from manager
Enable bool `mapstructure:"enable" yaml:"enable"`
// Addr is manager addresse
Addr string `mapstructure:"addr" yaml:"addr"`
// NetAddrs is manager addresses.
NetAddrs []dfnet.NetAddr `mapstructure:"netAddrs" yaml:"netAddrs"`
// RefreshInterval is the refresh interval
RefreshInterval time.Duration `mapstructure:"refreshInterval" yaml:"refreshInterval"`
}
Expand Down
9 changes: 7 additions & 2 deletions client/config/peerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,13 @@ func TestPeerHostOption_Load(t *testing.T) {
KeepStorage: false,
Scheduler: SchedulerOption{
Manager: ManagerOption{
Enable: false,
Addr: "127.0.0.1:65003",
Enable: false,
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:65003",
},
},
RefreshInterval: 5 * time.Minute,
},
NetAddrs: []dfnet.NetAddr{
Expand Down
4 changes: 3 additions & 1 deletion client/config/testdata/config/daemon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ keepStorage: false
scheduler:
manager:
enable: false
addr: "127.0.0.1:65003"
netAddrs:
- type: tcp
addr: 127.0.0.1:65003
refreshInterval: 5m
netAddrs:
- type: tcp
Expand Down
7 changes: 2 additions & 5 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,13 @@ func New(opt *config.DaemonOption) (Daemon, error) {
var dynconfig config.Dynconfig
if opt.Scheduler.Manager.Enable == true {
// New manager client
managerClient, err := managerclient.New(opt.Scheduler.Manager.Addr)
managerClient, err := managerclient.NewWithAddrs(opt.Scheduler.Manager.NetAddrs)
if err != nil {
return nil, err
}

// New dynconfig client
if dynconfig, err = config.NewDynconfig(
config.NewManagerClient(managerClient, opt.Host),
opt.Scheduler.Manager.RefreshInterval,
); err != nil {
if dynconfig, err = config.NewDynconfig(managerClient, opt.Host, opt.Scheduler.Manager.RefreshInterval); err != nil {
return nil, err
}

Expand Down
6 changes: 4 additions & 2 deletions docs/en/deployment/configuration/dfget.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ scheduler:
manager:
# get scheduler list dynamically from manager
enable: false
# manager service address
addr: 127.0.0.1:65003
# manager service addresses
netAddrs:
- type: tcp
addr: 127.0.0.1:65003
# scheduler list refresh interval
refreshInterval: 5m
# schedule timeout
Expand Down
4 changes: 3 additions & 1 deletion docs/zh-CN/deployment/configuration/dfget.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ scheduler:
# 通过 manager 接口动态获取 scheduler 列表
enable: false
# manager 服务地址
addr: 127.0.0.1:65003
netAddrs:
- type: tcp
addr: 127.0.0.1:65003
# scheduler 列表刷新时间
refreshInterval: 5m
# 调度超时
Expand Down
16 changes: 16 additions & 0 deletions pkg/rpc/manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"context"
"errors"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -27,6 +28,8 @@ import (
"google.golang.org/grpc/backoff"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/reachable"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
)

Expand Down Expand Up @@ -91,6 +94,19 @@ func New(target string) (Client, error) {
}, nil
}

func NewWithAddrs(netAddrs []dfnet.NetAddr) (Client, error) {
for _, netAddr := range netAddrs {
ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr})
if err := ipReachable.Check(); err == nil {
logger.Infof("use %s address for manager grpc client", netAddr.Addr)
return New(netAddr.Addr)
}
logger.Warnf("%s address can not reachable", netAddr.Addr)
}

return nil, errors.New("can not find available addresses")
}

func (c *client) GetScheduler(req *manager.GetSchedulerRequest) (*manager.Scheduler, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
Expand Down

0 comments on commit 4542d21

Please sign in to comment.