Skip to content

Commit

Permalink
feat: merge sync peer with peer table in manager (#2668)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 24, 2023
1 parent 63392f2 commit fe28ba4
Show file tree
Hide file tree
Showing 21 changed files with 363 additions and 92 deletions.
52 changes: 33 additions & 19 deletions client/daemon/announcer/announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ type Announcer interface {

// announcer provides announce function.
type announcer struct {
config *config.DaemonOption
dynconfig config.Dynconfig
hostID string
daemonPort int32
daemonDownloadPort int32
schedulerClient schedulerclient.V1
managerClient managerclient.V1
done chan struct{}
config *config.DaemonOption
dynconfig config.Dynconfig
hostID string
daemonPort int32
daemonDownloadPort int32
daemonObjectStoragePort int32
schedulerClient schedulerclient.V1
managerClient managerclient.V1
done chan struct{}
}

// Option is a functional option for configuring the announcer.
Expand All @@ -72,6 +73,13 @@ func WithManagerClient(client managerclient.V1) Option {
}
}

// WithObjectStoragePort sets the daemonObjectStoragePort.
func WithObjectStoragePort(port int32) Option {
return func(a *announcer) {
a.daemonObjectStoragePort = port
}
}

// New returns a new Announcer interface.
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer {
a := &announcer{
Expand Down Expand Up @@ -153,6 +161,11 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest,
hostType = types.HostTypeSuperSeedName
}

var objectStoragePort int32
if a.config.ObjectStorage.Enable {
objectStoragePort = a.daemonObjectStoragePort
}

pid := os.Getpid()

h, err := host.Info()
Expand Down Expand Up @@ -223,17 +236,18 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest,
}

return &schedulerv1.AnnounceHostRequest{
Id: a.hostID,
Type: hostType,
Hostname: a.config.Host.Hostname,
Ip: a.config.Host.AdvertiseIP.String(),
Port: a.daemonPort,
DownloadPort: a.daemonDownloadPort,
Os: h.OS,
Platform: h.Platform,
PlatformFamily: h.PlatformFamily,
PlatformVersion: h.PlatformVersion,
KernelVersion: h.KernelVersion,
Id: a.hostID,
Type: hostType,
Hostname: a.config.Host.Hostname,
Ip: a.config.Host.AdvertiseIP.String(),
Port: a.daemonPort,
DownloadPort: a.daemonDownloadPort,
ObjectStoragePort: objectStoragePort,
Os: h.OS,
Platform: h.Platform,
PlatformFamily: h.PlatformFamily,
PlatformVersion: h.PlatformVersion,
KernelVersion: h.KernelVersion,
Cpu: &schedulerv1.CPU{
LogicalCount: uint32(cpuLogicalCount),
PhysicalCount: uint32(cpuPhysicalCount),
Expand Down
12 changes: 10 additions & 2 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,15 @@ func (cd *clientDaemon) Serve() error {
cd.schedPeerHost.DownPort = int32(uploadPort)

// prepare object storage service listen
var objectStorageListener net.Listener
var (
objectStorageListener net.Listener
objectStoragePort int
)
if cd.Option.ObjectStorage.Enable {
if cd.Option.ObjectStorage.TCPListen == nil {
return errors.New("object storage tcp listen option is empty")
}
objectStorageListener, _, err = cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true)
objectStorageListener, objectStoragePort, err = cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true)
if err != nil {
logger.Errorf("failed to listen for object storage service: %v", err)
return err
Expand Down Expand Up @@ -678,6 +681,11 @@ func (cd *clientDaemon) Serve() error {
if cd.managerClient != nil {
announcerOptions = append(announcerOptions, announcer.WithManagerClient(cd.managerClient))
}

if cd.Option.ObjectStorage.Enable {
announcerOptions = append(announcerOptions, announcer.WithObjectStoragePort(int32(objectStoragePort)))
}

cd.announcer = announcer.New(&cd.Option, cd.dynconfig, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort,
cd.schedPeerHost.DownPort, cd.schedulerClient, announcerOptions...)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20

require (
d7y.io/api/v2 v2.0.21
d7y.io/api/v2 v2.0.23
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.21 h1:g/hiw1KhkroQjqsnetGdjutEJv4zsNY8LoM2jDM2UYg=
d7y.io/api/v2 v2.0.21/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
d7y.io/api/v2 v2.0.23 h1:s0vDhh5P1jfKO/dee2DhQiUG7eFORVF2/M9O9SLRNQI=
d7y.io/api/v2 v2.0.23/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
6 changes: 6 additions & 0 deletions internal/dflog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ func With(args ...any) *SugaredLoggerOnWith {
}
}

func WithScheduler(hostname, ip string, clusterID uint64) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"hostname", hostname, "ip", ip, "clusterID", clusterID},
}
}

func WithPeer(hostID, taskID, peerID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"hostID", hostID, "taskID", taskID, "peerID", peerID},
Expand Down
10 changes: 10 additions & 0 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
}, nil
}

func MarshalResponse(v any) (string, error) {
b, err := json.Marshal(v)
if err != nil {
return "", err
}

return string(b), nil
}

func MarshalRequest(v any) ([]machineryv1tasks.Arg, error) {
b, err := json.Marshal(v)
if err != nil {
Expand All @@ -169,6 +178,7 @@ func UnmarshalResponse(data []reflect.Value, v any) error {
if err := json.Unmarshal([]byte(data[0].String()), v); err != nil {
return err
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestJobMarshal(t *testing.T) {
func TestJob_MarshalRequest(t *testing.T) {
tests := []struct {
name string
value any
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestJobMarshal(t *testing.T) {
}
}

func TestJobUnmarshal(t *testing.T) {
func TestJob_UnmarshalResponse(t *testing.T) {
tests := []struct {
name string
data []reflect.Value
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestJobUnmarshal(t *testing.T) {
}
}

func TestUnmarshalRequest(t *testing.T) {
func TestJob_UnmarshalRequest(t *testing.T) {
tests := []struct {
name string
data string
Expand Down
2 changes: 1 addition & 1 deletion internal/job/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestJobGetSchedulerQueue(t *testing.T) {
func TestJob_GetSchedulerQueue(t *testing.T) {
tests := []struct {
name string
clusterID uint
Expand Down
5 changes: 5 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func New() *Config {
},
SyncPeers: SyncPeersConfig{
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Expand Down Expand Up @@ -626,6 +627,10 @@ func (cfg *Config) Validate() error {
return errors.New("syncPeers requires parameter interval and it must be greater than 12 hours")
}

if cfg.Job.SyncPeers.Timeout == 0 {
return errors.New("syncPeers requires parameter timeout")
}

if cfg.ObjectStorage.Enable {
if cfg.ObjectStorage.Name == "" {
return errors.New("objectStorage requires parameter name")
Expand Down
16 changes: 16 additions & 0 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestConfig_Load(t *testing.T) {
},
SyncPeers: SyncPeersConfig{
Interval: 13 * time.Hour,
Timeout: 2 * time.Minute,
},
},
ObjectStorage: ObjectStorageConfig{
Expand Down Expand Up @@ -759,6 +760,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "syncPeers requires parameter interval and it must be greater than 12 hours")
},
},
{
name: "syncPeers requires parameter timeout",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.SyncPeers.Timeout = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "syncPeers requires parameter timeout")
},
},
{
name: "objectStorage requires parameter name",
config: New(),
Expand Down
3 changes: 3 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (

// MinJobSyncPeersInterval is the min interval for syncing all peers information from the scheduler.
MinJobSyncPeersInterval = 12 * time.Hour

// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute
)

const (
Expand Down
1 change: 1 addition & 0 deletions manager/config/testdata/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ job:
caCert: testdata/ca.crt
syncPeers:
interval: 13h
timeout: 2m

objectStorage:
enable: true
Expand Down
15 changes: 15 additions & 0 deletions manager/job/mocks/sync_peers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit fe28ba4

Please sign in to comment.