Skip to content

Commit

Permalink
feat: peer announces scheduler cluster id to scheduler (#2652)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 21, 2023
1 parent 80f88d3 commit f3b9290
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 162 deletions.
3 changes: 3 additions & 0 deletions client/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Dynconfig interface {
// Get the dynamic schedulers config.
GetSchedulers() ([]*managerv1.Scheduler, error)

// Get the dynamic schedulers cluster id.
GetSchedulerClusterID() uint64

// Get the dynamic object storage config.
GetObjectStorage() (*managerv1.ObjectStorage, error)

Expand Down
6 changes: 6 additions & 0 deletions client/config/dynconfig_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (d *dynconfigLocal) GetSchedulers() ([]*managerv1.Scheduler, error) {
return nil, ErrUnimplemented
}

// Get the dynamic schedulers cluster id. The local dynamic configuration does not support
// get the scheduler cluster id.
func (d *dynconfigLocal) GetSchedulerClusterID() uint64 {
return 0
}

// Get the dynamic object storage config from local.
func (d *dynconfigLocal) GetObjectStorage() (*managerv1.ObjectStorage, error) {
return nil, ErrUnimplemented
Expand Down
7 changes: 7 additions & 0 deletions client/config/dynconfig_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type dynconfigManager struct {
done chan struct{}
cachePath string
transportCredentials credentials.TransportCredentials
schedulerClusterID uint64
}

// newDynconfigManager returns a new manager dynconfig instence.
Expand Down Expand Up @@ -147,6 +148,7 @@ func (d *dynconfigManager) GetResolveSchedulerAddrs() ([]resolver.Address, error
return nil, errors.New("can not found available scheduler addresses")
}

d.schedulerClusterID = schedulerClusterID
return resolveAddrs, nil
}

Expand All @@ -168,6 +170,11 @@ func (d *dynconfigManager) GetSchedulers() ([]*managerv1.Scheduler, error) {
return data.Schedulers, nil
}

// Get the dynamic schedulers cluster id.
func (d *dynconfigManager) GetSchedulerClusterID() uint64 {
return d.schedulerClusterID
}

// Get the dynamic object storage config from manager.
func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error) {
data, err := d.Get()
Expand Down
14 changes: 14 additions & 0 deletions client/config/mocks/dynconfig_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion client/daemon/announcer/announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Announcer interface {
// announcer provides announce function.
type announcer struct {
config *config.DaemonOption
dynconfig config.Dynconfig
hostID string
daemonPort int32
daemonDownloadPort int32
Expand All @@ -72,9 +73,10 @@ func WithManagerClient(client managerclient.V1) Option {
}

// New returns a new Announcer interface.
func New(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer {
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer {
a := &announcer{
config: cfg,
dynconfig: dynconfig,
hostID: hostID,
daemonPort: daemonPort,
daemonDownloadPort: daemonDownloadPort,
Expand Down Expand Up @@ -280,6 +282,7 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest,
GoVersion: version.GoVersion,
Platform: version.Platform,
},
SchedulerClusterId: a.dynconfig.GetSchedulerClusterID(),
}, nil
}

Expand Down
4 changes: 3 additions & 1 deletion client/daemon/announcer/announcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"d7y.io/dragonfly/v2/client/config"
configmocks "d7y.io/dragonfly/v2/client/config/mocks"
managerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/manager/client/mocks"
schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
)
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestAnnouncer_New(t *testing.T) {
defer ctl.Finish()
mockManagerClient := managerclientmocks.NewMockV1(ctl)
mockSchedulerClient := schedulerclientmocks.NewMockV1(ctl)
tc.expect(t, New(tc.config, tc.hostID, tc.deamonPort, tc.deamonDownloadPort, mockSchedulerClient, WithManagerClient(mockManagerClient)))
mockDynconfig := configmocks.NewMockDynconfig(ctl)
tc.expect(t, New(tc.config, mockDynconfig, tc.hostID, tc.deamonPort, tc.deamonDownloadPort, mockSchedulerClient, WithManagerClient(mockManagerClient)))
})
}
}
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ func (cd *clientDaemon) Serve() error {
if cd.managerClient != nil {
announcerOptions = append(announcerOptions, announcer.WithManagerClient(cd.managerClient))
}
cd.announcer = announcer.New(&cd.Option, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort,
cd.announcer = announcer.New(&cd.Option, cd.dynconfig, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort,
cd.schedPeerHost.DownPort, cd.schedulerClient, announcerOptions...)
go func() {
logger.Info("serve announcer")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api/v2 v2.0.19
d7y.io/api/v2 v2.0.21
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.19 h1:jTHEp/JW8hhjGCzCzGGNbpcxnxofNvoeUTwDfv3CeDM=
d7y.io/api/v2 v2.0.19/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
d7y.io/api/v2 v2.0.21 h1:g/hiw1KhkroQjqsnetGdjutEJv4zsNY8LoM2jDM2UYg=
d7y.io/api/v2 v2.0.21/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
15 changes: 12 additions & 3 deletions scheduler/resource/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ import (
// HostOption is a functional option for configuring the host.
type HostOption func(h *Host)

// WithSchedulerClusterID sets host's SchedulerClusterID.
func WithSchedulerClusterID(id uint64) HostOption {
return func(h *Host) {
h.SchedulerClusterID = id
}
}

// WithConcurrentUploadLimit sets host's ConcurrentUploadLimit.
func WithConcurrentUploadLimit(limit int32) HostOption {
return func(h *Host) {
Expand Down Expand Up @@ -158,6 +165,9 @@ type Host struct {
// Build information.
Build Build

// SchedulerClusterID is the scheduler cluster id matched by scopes.
SchedulerClusterID uint64

// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit *atomic.Int32

Expand Down Expand Up @@ -317,9 +327,8 @@ type Disk struct {

// New host instance.
func NewHost(
id, ip, hostname string,
port, downloadPort int32, typ types.HostType,
options ...HostOption,
id, ip, hostname string, port, downloadPort int32,
typ types.HostType, options ...HostOption,
) *Host {
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
Expand Down
36 changes: 36 additions & 0 deletions scheduler/resource/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -172,6 +173,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawSeedHost.IP)
assert.Equal(host.Port, mockRawSeedHost.Port)
assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -183,6 +185,30 @@ func TestHost_NewHost(t *testing.T) {
assert.NotNil(host.Log)
},
},
{
name: "new host and set scheduler cluster id",
rawHost: mockRawHost,
options: []HostOption{WithSchedulerClusterID(1)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.SchedulerClusterID, uint64(1))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set upload loadlimit",
rawHost: mockRawHost,
Expand All @@ -195,6 +221,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -219,6 +246,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.OS, "linux")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -243,6 +271,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.Platform, "ubuntu")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -267,6 +296,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.PlatformFamily, "debian")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down Expand Up @@ -315,6 +345,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.KernelVersion, "5.15.0-27-generic")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -339,6 +370,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.CPU, mockCPU)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -363,6 +395,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Memory, mockMemory)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -387,6 +420,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Network, mockNetwork)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -411,6 +445,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Disk, mockDisk)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand All @@ -435,6 +470,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Build, mockBuild)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
Expand Down
Loading

0 comments on commit f3b9290

Please sign in to comment.