From f3b92900afd9ced97a99c9a95971d69f818afa4d Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 21 Aug 2023 22:00:05 +0800 Subject: [PATCH] feat: peer announces scheduler cluster id to scheduler (#2652) Signed-off-by: Gaius --- client/config/dynconfig.go | 3 + client/config/dynconfig_local.go | 6 + client/config/dynconfig_manager.go | 7 + client/config/mocks/dynconfig_mock.go | 14 + client/daemon/announcer/announcer.go | 5 +- client/daemon/announcer/announcer_test.go | 4 +- client/daemon/daemon.go | 2 +- go.mod | 2 +- go.sum | 4 +- scheduler/resource/host.go | 15 +- scheduler/resource/host_test.go | 36 +++ scheduler/service/service_v1.go | 304 +++++++++++----------- scheduler/service/service_v2.go | 4 + scheduler/storage/storage_test.go | 5 +- scheduler/storage/types.go | 3 + trainer/storage/testdata/download.csv | 2 +- 16 files changed, 254 insertions(+), 162 deletions(-) diff --git a/client/config/dynconfig.go b/client/config/dynconfig.go index 8c5356fb1d6..1ba82403ac8 100644 --- a/client/config/dynconfig.go +++ b/client/config/dynconfig.go @@ -55,6 +55,9 @@ type Dynconfig interface { // Get the dynamic schedulers config. GetSchedulers() ([]*managerv1.Scheduler, error) + // Get the dynamic schedulers cluster id. + GetSchedulerClusterID() uint64 + // Get the dynamic object storage config. GetObjectStorage() (*managerv1.ObjectStorage, error) diff --git a/client/config/dynconfig_local.go b/client/config/dynconfig_local.go index b8ca73840a1..81acc317352 100644 --- a/client/config/dynconfig_local.go +++ b/client/config/dynconfig_local.go @@ -104,6 +104,12 @@ func (d *dynconfigLocal) GetSchedulers() ([]*managerv1.Scheduler, error) { return nil, ErrUnimplemented } +// Get the dynamic schedulers cluster id. The local dynamic configuration does not support +// get the scheduler cluster id. +func (d *dynconfigLocal) GetSchedulerClusterID() uint64 { + return 0 +} + // Get the dynamic object storage config from local. func (d *dynconfigLocal) GetObjectStorage() (*managerv1.ObjectStorage, error) { return nil, ErrUnimplemented diff --git a/client/config/dynconfig_manager.go b/client/config/dynconfig_manager.go index 4812d6b7a53..add17843eac 100644 --- a/client/config/dynconfig_manager.go +++ b/client/config/dynconfig_manager.go @@ -52,6 +52,7 @@ type dynconfigManager struct { done chan struct{} cachePath string transportCredentials credentials.TransportCredentials + schedulerClusterID uint64 } // newDynconfigManager returns a new manager dynconfig instence. @@ -147,6 +148,7 @@ func (d *dynconfigManager) GetResolveSchedulerAddrs() ([]resolver.Address, error return nil, errors.New("can not found available scheduler addresses") } + d.schedulerClusterID = schedulerClusterID return resolveAddrs, nil } @@ -168,6 +170,11 @@ func (d *dynconfigManager) GetSchedulers() ([]*managerv1.Scheduler, error) { return data.Schedulers, nil } +// Get the dynamic schedulers cluster id. +func (d *dynconfigManager) GetSchedulerClusterID() uint64 { + return d.schedulerClusterID +} + // Get the dynamic object storage config from manager. func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error) { data, err := d.Get() diff --git a/client/config/mocks/dynconfig_mock.go b/client/config/mocks/dynconfig_mock.go index d693cf2b7c6..d0afb0177c3 100644 --- a/client/config/mocks/dynconfig_mock.go +++ b/client/config/mocks/dynconfig_mock.go @@ -93,6 +93,20 @@ func (mr *MockDynconfigMockRecorder) GetResolveSchedulerAddrs() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResolveSchedulerAddrs", reflect.TypeOf((*MockDynconfig)(nil).GetResolveSchedulerAddrs)) } +// GetSchedulerClusterID mocks base method. +func (m *MockDynconfig) GetSchedulerClusterID() uint64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSchedulerClusterID") + ret0, _ := ret[0].(uint64) + return ret0 +} + +// GetSchedulerClusterID indicates an expected call of GetSchedulerClusterID. +func (mr *MockDynconfigMockRecorder) GetSchedulerClusterID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSchedulerClusterID", reflect.TypeOf((*MockDynconfig)(nil).GetSchedulerClusterID)) +} + // GetSchedulers mocks base method. func (m *MockDynconfig) GetSchedulers() ([]*manager.Scheduler, error) { m.ctrl.T.Helper() diff --git a/client/daemon/announcer/announcer.go b/client/daemon/announcer/announcer.go index 45a76523095..51f8e532fa7 100644 --- a/client/daemon/announcer/announcer.go +++ b/client/daemon/announcer/announcer.go @@ -53,6 +53,7 @@ type Announcer interface { // announcer provides announce function. type announcer struct { config *config.DaemonOption + dynconfig config.Dynconfig hostID string daemonPort int32 daemonDownloadPort int32 @@ -72,9 +73,10 @@ func WithManagerClient(client managerclient.V1) Option { } // New returns a new Announcer interface. -func New(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer { +func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer { a := &announcer{ config: cfg, + dynconfig: dynconfig, hostID: hostID, daemonPort: daemonPort, daemonDownloadPort: daemonDownloadPort, @@ -280,6 +282,7 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest, GoVersion: version.GoVersion, Platform: version.Platform, }, + SchedulerClusterId: a.dynconfig.GetSchedulerClusterID(), }, nil } diff --git a/client/daemon/announcer/announcer_test.go b/client/daemon/announcer/announcer_test.go index b8796200b78..9bcc82c11af 100644 --- a/client/daemon/announcer/announcer_test.go +++ b/client/daemon/announcer/announcer_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "d7y.io/dragonfly/v2/client/config" + configmocks "d7y.io/dragonfly/v2/client/config/mocks" managerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/manager/client/mocks" schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks" ) @@ -78,7 +79,8 @@ func TestAnnouncer_New(t *testing.T) { defer ctl.Finish() mockManagerClient := managerclientmocks.NewMockV1(ctl) mockSchedulerClient := schedulerclientmocks.NewMockV1(ctl) - tc.expect(t, New(tc.config, tc.hostID, tc.deamonPort, tc.deamonDownloadPort, mockSchedulerClient, WithManagerClient(mockManagerClient))) + mockDynconfig := configmocks.NewMockDynconfig(ctl) + tc.expect(t, New(tc.config, mockDynconfig, tc.hostID, tc.deamonPort, tc.deamonDownloadPort, mockSchedulerClient, WithManagerClient(mockManagerClient))) }) } } diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 3a61b7548b3..9b5b8acb9c0 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -678,7 +678,7 @@ func (cd *clientDaemon) Serve() error { if cd.managerClient != nil { announcerOptions = append(announcerOptions, announcer.WithManagerClient(cd.managerClient)) } - cd.announcer = announcer.New(&cd.Option, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, + cd.announcer = announcer.New(&cd.Option, cd.dynconfig, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, cd.schedPeerHost.DownPort, cd.schedulerClient, announcerOptions...) go func() { logger.Info("serve announcer") diff --git a/go.mod b/go.mod index 9beb92d4e0c..7bb5dc27ced 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api/v2 v2.0.19 + d7y.io/api/v2 v2.0.21 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 15985319dfc..c7dbee84674 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.19 h1:jTHEp/JW8hhjGCzCzGGNbpcxnxofNvoeUTwDfv3CeDM= -d7y.io/api/v2 v2.0.19/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY= +d7y.io/api/v2 v2.0.21 h1:g/hiw1KhkroQjqsnetGdjutEJv4zsNY8LoM2jDM2UYg= +d7y.io/api/v2 v2.0.21/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index bb637a2bfd7..7ca493eda6c 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -31,6 +31,13 @@ import ( // HostOption is a functional option for configuring the host. type HostOption func(h *Host) +// WithSchedulerClusterID sets host's SchedulerClusterID. +func WithSchedulerClusterID(id uint64) HostOption { + return func(h *Host) { + h.SchedulerClusterID = id + } +} + // WithConcurrentUploadLimit sets host's ConcurrentUploadLimit. func WithConcurrentUploadLimit(limit int32) HostOption { return func(h *Host) { @@ -158,6 +165,9 @@ type Host struct { // Build information. Build Build + // SchedulerClusterID is the scheduler cluster id matched by scopes. + SchedulerClusterID uint64 + // ConcurrentUploadLimit is concurrent upload limit count. ConcurrentUploadLimit *atomic.Int32 @@ -317,9 +327,8 @@ type Disk struct { // New host instance. func NewHost( - id, ip, hostname string, - port, downloadPort int32, typ types.HostType, - options ...HostOption, + id, ip, hostname string, port, downloadPort int32, + typ types.HostType, options ...HostOption, ) *Host { // Calculate default of the concurrent upload limit by host type. concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index 898f6244da5..042619665ce 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -150,6 +150,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -172,6 +173,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawSeedHost.IP) assert.Equal(host.Port, mockRawSeedHost.Port) assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -183,6 +185,30 @@ func TestHost_NewHost(t *testing.T) { assert.NotNil(host.Log) }, }, + { + name: "new host and set scheduler cluster id", + rawHost: mockRawHost, + options: []HostOption{WithSchedulerClusterID(1)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.SchedulerClusterID, uint64(1)) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, { name: "new host and set upload loadlimit", rawHost: mockRawHost, @@ -195,6 +221,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -219,6 +246,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.OS, "linux") + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -243,6 +271,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.Platform, "ubuntu") + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -267,6 +296,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.PlatformFamily, "debian") + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -315,6 +345,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.KernelVersion, "5.15.0-27-generic") + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -339,6 +370,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.EqualValues(host.CPU, mockCPU) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -363,6 +395,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.EqualValues(host.Memory, mockMemory) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -387,6 +420,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.EqualValues(host.Network, mockNetwork) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -411,6 +445,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.EqualValues(host.Disk, mockDisk) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -435,6 +470,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.SchedulerClusterID, uint64(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 4b44ebf6c1e..fa9863103aa 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -92,12 +92,12 @@ func NewV1( // RegisterPeerTask registers peer and triggers seed peer download task. func (v *V1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) { - logger.WithPeer(req.PeerHost.Id, req.TaskId, req.PeerId).Infof("register peer task request: %#v", req) + logger.WithPeer(req.PeerHost.GetId(), req.GetTaskId(), req.GetPeerId()).Infof("register peer task request: %#v", req) // Store resource. task := v.storeTask(ctx, req, commonv2.TaskType_DFDAEMON) - host := v.storeHost(ctx, req.PeerHost) - peer := v.storePeer(ctx, req.PeerId, req.UrlMeta.Priority, req.UrlMeta.Range, task, host) + host := v.storeHost(ctx, req.GetPeerHost()) + peer := v.storePeer(ctx, req.GetPeerId(), req.UrlMeta.GetPriority(), req.UrlMeta.GetRange(), task, host) // Trigger the first download of the task. if err := v.triggerTask(ctx, req, task, host, peer, v.dynconfig); err != nil { @@ -281,11 +281,11 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer // ReportPeerResult handles peer result reported by dfdaemon. func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) error { - logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("report peer result request: %#v", req) + logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId()).Infof("report peer result request: %#v", req) - peer, loaded := v.resource.PeerManager().Load(req.PeerId) + peer, loaded := v.resource.PeerManager().Load(req.GetPeerId()) if !loaded { - msg := fmt.Sprintf("peer %s not found", req.PeerId) + msg := fmt.Sprintf("peer %s not found", req.GetPeerId()) logger.Error(msg) return dferrors.New(commonv1.Code_SchedPeerNotFound, msg) } @@ -296,7 +296,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() parents := peer.Parents() - if !req.Success { + if !req.GetSuccess() { peer.Log.Error("report failed peer") if peer.FSM.Is(resource.PeerStateBackToSource) { // Collect DownloadPeerBackToSourceFailureCount metrics. @@ -324,12 +324,12 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) v.handleTaskSuccess(ctx, peer.Task, req) v.handlePeerSuccess(ctx, peer) metrics.DownloadPeerDuration.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(req.Cost)) + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(req.GetCost())) return nil } metrics.DownloadPeerDuration.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(req.Cost)) + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Observe(float64(req.GetCost())) go v.createDownloadRecord(peer, parents, req) v.handlePeerSuccess(ctx, peer) @@ -338,22 +338,22 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) // AnnounceTask informs scheduler a peer has completed task. func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest) error { - logger.WithPeer(req.PeerHost.Id, req.TaskId, req.PiecePacket.DstPid).Infof("announce task request: %#v %#v %#v %#v", - req, req.UrlMeta, req.PeerHost, req.PiecePacket, + logger.WithPeer(req.PeerHost.GetId(), req.GetTaskId(), req.PiecePacket.GetDstPid()).Infof("announce task request: %#v %#v %#v %#v", + req, req.GetUrlMeta(), req.GetPeerHost(), req.GetPiecePacket(), ) - taskID := req.TaskId - peerID := req.PiecePacket.DstPid + taskID := req.GetTaskId() + peerID := req.PiecePacket.GetDstPid() options := []resource.TaskOption{} - if d, err := digest.Parse(req.UrlMeta.Digest); err == nil { + if d, err := digest.Parse(req.UrlMeta.GetDigest()); err == nil { options = append(options, resource.WithDigest(d)) } - task := resource.NewTask(taskID, req.Url, req.UrlMeta.Tag, req.UrlMeta.Application, types.TaskTypeV1ToV2(req.TaskType), - strings.Split(req.UrlMeta.Filter, idgen.URLFilterSeparator), req.UrlMeta.Header, int32(v.config.Scheduler.BackToSourceCount), options...) + task := resource.NewTask(taskID, req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), types.TaskTypeV1ToV2(req.GetTaskType()), + strings.Split(req.UrlMeta.GetFilter(), idgen.URLFilterSeparator), req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) task, _ = v.resource.TaskManager().LoadOrStore(task) - host := v.storeHost(ctx, req.PeerHost) - peer := v.storePeer(ctx, peerID, req.UrlMeta.Priority, req.UrlMeta.Range, task, host) + host := v.storeHost(ctx, req.GetPeerHost()) + peer := v.storePeer(ctx, peerID, req.UrlMeta.GetPriority(), req.UrlMeta.GetRange(), task, host) // If the task state is not TaskStateSucceeded, // advance the task state to TaskStateSucceeded. @@ -367,10 +367,10 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ } // Construct piece. - for _, pieceInfo := range req.PiecePacket.PieceInfos { + for _, pieceInfo := range req.PiecePacket.GetPieceInfos() { piece := &resource.Piece{ Number: pieceInfo.PieceNum, - ParentID: req.PiecePacket.DstPid, + ParentID: req.PiecePacket.GetDstPid(), Offset: pieceInfo.RangeStart, Length: uint64(pieceInfo.RangeSize), TrafficType: commonv2.TrafficType_LOCAL_PEER, @@ -389,8 +389,8 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ } v.handleTaskSuccess(ctx, task, &schedulerv1.PeerResult{ - TotalPieceCount: req.PiecePacket.TotalPiece, - ContentLength: req.PiecePacket.ContentLength, + TotalPieceCount: req.PiecePacket.GetTotalPiece(), + ContentLength: req.PiecePacket.GetContentLength(), }) } @@ -423,11 +423,11 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ // StatTask checks the current state of the task. func (v *V1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) { - logger.WithTaskID(req.TaskId).Infof("stat task request: %#v", req) + logger.WithTaskID(req.GetTaskId()).Infof("stat task request: %#v", req) - task, loaded := v.resource.TaskManager().Load(req.TaskId) + task, loaded := v.resource.TaskManager().Load(req.GetTaskId()) if !loaded { - msg := fmt.Sprintf("task %s not found", req.TaskId) + msg := fmt.Sprintf("task %s not found", req.GetTaskId()) logger.Info(msg) return nil, dferrors.New(commonv1.Code_PeerTaskNotFound, msg) } @@ -445,11 +445,11 @@ func (v *V1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*s // LeaveTask releases peer in scheduler. func (v *V1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) error { - logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("leave task request: %#v", req) + logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId()).Infof("leave task request: %#v", req) - peer, loaded := v.resource.PeerManager().Load(req.PeerId) + peer, loaded := v.resource.PeerManager().Load(req.GetPeerId()) if !loaded { - msg := fmt.Sprintf("peer %s not found", req.PeerId) + msg := fmt.Sprintf("peer %s not found", req.GetPeerId()) logger.Error(msg) return dferrors.New(commonv1.Code_SchedPeerNotFound, msg) } @@ -471,87 +471,90 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ concurrentUploadLimit = int32(clientConfig.LoadLimit) } - host, loaded := v.resource.HostManager().Load(req.Id) + host, loaded := v.resource.HostManager().Load(req.GetId()) if !loaded { options := []resource.HostOption{ - resource.WithOS(req.Os), - resource.WithPlatform(req.Platform), - resource.WithPlatformFamily(req.PlatformFamily), - resource.WithPlatformVersion(req.PlatformVersion), - resource.WithKernelVersion(req.KernelVersion), + resource.WithOS(req.GetOs()), + resource.WithPlatform(req.GetPlatform()), + resource.WithPlatformFamily(req.GetPlatformFamily()), + resource.WithPlatformVersion(req.GetPlatformVersion()), + resource.WithKernelVersion(req.GetKernelVersion()), } if concurrentUploadLimit > 0 { options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit)) } - if req.Cpu != nil { + if req.GetCpu() != nil { options = append(options, resource.WithCPU(resource.CPU{ - LogicalCount: req.Cpu.LogicalCount, - PhysicalCount: req.Cpu.PhysicalCount, - Percent: req.Cpu.Percent, - ProcessPercent: req.Cpu.ProcessPercent, + LogicalCount: req.Cpu.GetLogicalCount(), + PhysicalCount: req.Cpu.GetPhysicalCount(), + Percent: req.Cpu.GetPercent(), + ProcessPercent: req.Cpu.GetProcessPercent(), Times: resource.CPUTimes{ - User: req.Cpu.Times.User, - System: req.Cpu.Times.System, - Idle: req.Cpu.Times.Idle, - Nice: req.Cpu.Times.Nice, - Iowait: req.Cpu.Times.Iowait, - Irq: req.Cpu.Times.Irq, - Softirq: req.Cpu.Times.Softirq, - Steal: req.Cpu.Times.Steal, - Guest: req.Cpu.Times.Guest, - GuestNice: req.Cpu.Times.GuestNice, + User: req.Cpu.Times.GetUser(), + System: req.Cpu.Times.GetSystem(), + Idle: req.Cpu.Times.GetIdle(), + Nice: req.Cpu.Times.GetNice(), + Iowait: req.Cpu.Times.GetIowait(), + Irq: req.Cpu.Times.GetIrq(), + Softirq: req.Cpu.Times.GetSoftirq(), + Steal: req.Cpu.Times.GetSteal(), + Guest: req.Cpu.Times.GetGuest(), + GuestNice: req.Cpu.Times.GetGuestNice(), }, })) } - if req.Memory != nil { + if req.GetMemory() != nil { options = append(options, resource.WithMemory(resource.Memory{ - Total: req.Memory.Total, - Available: req.Memory.Available, - Used: req.Memory.Used, - UsedPercent: req.Memory.UsedPercent, - ProcessUsedPercent: req.Memory.ProcessUsedPercent, - Free: req.Memory.Free, + Total: req.Memory.GetTotal(), + Available: req.Memory.GetAvailable(), + Used: req.Memory.GetUsed(), + UsedPercent: req.Memory.GetUsedPercent(), + ProcessUsedPercent: req.Memory.GetProcessUsedPercent(), + Free: req.Memory.GetFree(), })) } - if req.Network != nil { + if req.GetNetwork() != nil { options = append(options, resource.WithNetwork(resource.Network{ - TCPConnectionCount: req.Network.TcpConnectionCount, - UploadTCPConnectionCount: req.Network.UploadTcpConnectionCount, - Location: req.Network.Location, - IDC: req.Network.Idc, + TCPConnectionCount: req.Network.GetTcpConnectionCount(), + UploadTCPConnectionCount: req.Network.GetUploadTcpConnectionCount(), + Location: req.Network.GetLocation(), + IDC: req.Network.GetIdc(), })) } - if req.Disk != nil { + if req.GetDisk() != nil { options = append(options, resource.WithDisk(resource.Disk{ - Total: req.Disk.Total, - Free: req.Disk.Free, - Used: req.Disk.Used, - UsedPercent: req.Disk.UsedPercent, - InodesTotal: req.Disk.InodesTotal, - InodesUsed: req.Disk.InodesUsed, - InodesFree: req.Disk.InodesFree, - InodesUsedPercent: req.Disk.InodesUsedPercent, + Total: req.Disk.GetTotal(), + Free: req.Disk.GetFree(), + Used: req.Disk.GetUsed(), + UsedPercent: req.Disk.GetUsedPercent(), + InodesTotal: req.Disk.GetInodesTotal(), + InodesUsed: req.Disk.GetInodesUsed(), + InodesFree: req.Disk.GetInodesFree(), + InodesUsedPercent: req.Disk.GetInodesUsedPercent(), })) } - if req.Build != nil { + if req.GetBuild() != nil { options = append(options, resource.WithBuild(resource.Build{ - GitVersion: req.Build.GitVersion, - GitCommit: req.Build.GitCommit, - GoVersion: req.Build.GoVersion, - Platform: req.Build.Platform, + GitVersion: req.Build.GetGitVersion(), + GitCommit: req.Build.GetGitCommit(), + GoVersion: req.Build.GetGoVersion(), + Platform: req.Build.GetPlatform(), })) } + if req.GetSchedulerClusterId() != 0 { + options = append(options, resource.WithSchedulerClusterID(req.GetSchedulerClusterId())) + } + host = resource.NewHost( - req.Id, req.Ip, req.Hostname, - req.Port, req.DownloadPort, types.ParseHostType(req.Type), - options..., + req.GetId(), req.GetIp(), req.GetHostname(), req.GetPort(), req.GetDownloadPort(), + types.ParseHostType(req.GetType()), options..., ) v.resource.HostManager().Store(host) host.Log.Infof("announce new host: %#v", req) @@ -559,80 +562,80 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ } // Host already exists and updates properties. - host.Port = req.Port - host.DownloadPort = req.DownloadPort - host.Type = types.ParseHostType(req.Type) - host.OS = req.Os - host.Platform = req.Platform - host.PlatformFamily = req.PlatformFamily - host.PlatformVersion = req.PlatformVersion - host.KernelVersion = req.KernelVersion + host.Port = req.GetPort() + host.DownloadPort = req.GetDownloadPort() + host.Type = types.ParseHostType(req.GetType()) + host.OS = req.GetOs() + host.Platform = req.GetPlatform() + host.PlatformFamily = req.GetPlatformFamily() + host.PlatformVersion = req.GetPlatformVersion() + host.KernelVersion = req.GetKernelVersion() host.UpdatedAt.Store(time.Now()) if concurrentUploadLimit > 0 { host.ConcurrentUploadLimit.Store(concurrentUploadLimit) } - if req.Cpu != nil { + if req.GetCpu() != nil { host.CPU = resource.CPU{ - LogicalCount: req.Cpu.LogicalCount, - PhysicalCount: req.Cpu.PhysicalCount, - Percent: req.Cpu.Percent, - ProcessPercent: req.Cpu.ProcessPercent, + LogicalCount: req.Cpu.GetLogicalCount(), + PhysicalCount: req.Cpu.GetPhysicalCount(), + Percent: req.Cpu.GetPercent(), + ProcessPercent: req.Cpu.GetProcessPercent(), Times: resource.CPUTimes{ - User: req.Cpu.Times.User, - System: req.Cpu.Times.System, - Idle: req.Cpu.Times.Idle, - Nice: req.Cpu.Times.Nice, - Iowait: req.Cpu.Times.Iowait, - Irq: req.Cpu.Times.Irq, - Softirq: req.Cpu.Times.Softirq, - Steal: req.Cpu.Times.Steal, - Guest: req.Cpu.Times.Guest, - GuestNice: req.Cpu.Times.GuestNice, + User: req.Cpu.Times.GetUser(), + System: req.Cpu.Times.GetSystem(), + Idle: req.Cpu.Times.GetIdle(), + Nice: req.Cpu.Times.GetNice(), + Iowait: req.Cpu.Times.GetIowait(), + Irq: req.Cpu.Times.GetIrq(), + Softirq: req.Cpu.Times.GetSoftirq(), + Steal: req.Cpu.Times.GetSteal(), + Guest: req.Cpu.Times.GetGuest(), + GuestNice: req.Cpu.Times.GetGuestNice(), }, } } - if req.Memory != nil { + if req.GetMemory() != nil { host.Memory = resource.Memory{ - Total: req.Memory.Total, - Available: req.Memory.Available, - Used: req.Memory.Used, - UsedPercent: req.Memory.UsedPercent, - ProcessUsedPercent: req.Memory.ProcessUsedPercent, - Free: req.Memory.Free, + Total: req.Memory.GetTotal(), + Available: req.Memory.GetAvailable(), + Used: req.Memory.GetUsed(), + UsedPercent: req.Memory.GetUsedPercent(), + ProcessUsedPercent: req.Memory.GetProcessUsedPercent(), + Free: req.Memory.GetFree(), } } - if req.Network != nil { + if req.GetNetwork() != nil { host.Network = resource.Network{ - TCPConnectionCount: req.Network.TcpConnectionCount, - UploadTCPConnectionCount: req.Network.UploadTcpConnectionCount, - Location: req.Network.Location, - IDC: req.Network.Idc, + TCPConnectionCount: req.Network.GetTcpConnectionCount(), + UploadTCPConnectionCount: req.Network.GetUploadTcpConnectionCount(), + Location: req.Network.GetLocation(), + IDC: req.Network.GetIdc(), } } - if req.Disk != nil { + if req.GetDisk() != nil { host.Disk = resource.Disk{ - Total: req.Disk.Total, - Free: req.Disk.Free, - Used: req.Disk.Used, - UsedPercent: req.Disk.UsedPercent, - InodesTotal: req.Disk.InodesTotal, - InodesUsed: req.Disk.InodesUsed, - InodesFree: req.Disk.InodesFree, - InodesUsedPercent: req.Disk.InodesUsedPercent, + Total: req.Disk.GetTotal(), + Free: req.Disk.GetFree(), + Used: req.Disk.GetUsed(), + UsedPercent: req.Disk.GetUsedPercent(), + InodesTotal: req.Disk.GetInodesTotal(), + InodesUsed: req.Disk.GetInodesUsed(), + InodesFree: req.Disk.GetInodesFree(), + InodesUsedPercent: req.Disk.GetInodesUsedPercent(), } } - if req.Build != nil { + if req.GetBuild() != nil { host.Build = resource.Build{ - GitVersion: req.Build.GitVersion, - GitCommit: req.Build.GitCommit, - GoVersion: req.Build.GoVersion, - Platform: req.Build.Platform, + GitVersion: req.Build.GetGitVersion(), + GitCommit: req.Build.GetGitCommit(), + GoVersion: req.Build.GetGoVersion(), + Platform: req.Build.GetPlatform(), } } @@ -641,11 +644,11 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ // LeaveHost releases host in scheduler. func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) error { - logger.WithHostID(req.Id).Infof("leave host request: %#v", req) + logger.WithHostID(req.GetId()).Infof("leave host request: %#v", req) - host, loaded := v.resource.HostManager().Load(req.Id) + host, loaded := v.resource.HostManager().Load(req.GetId()) if !loaded { - msg := fmt.Sprintf("host %s not found", req.Id) + msg := fmt.Sprintf("host %s not found", req.GetId()) logger.Error(msg) return dferrors.New(commonv1.Code_BadRequest, msg) } @@ -671,13 +674,13 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error { return err } - logger := logger.WithHost(req.Host.Id, req.Host.Hostname, req.Host.Ip) + logger := logger.WithHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp()) switch syncProbesRequest := req.GetRequest().(type) { case *schedulerv1.SyncProbesRequest_ProbeStartedRequest: // Find probed hosts in network topology. Based on the source host information, // the most candidate hosts will be evaluated. logger.Info("receive SyncProbesRequest_ProbeStartedRequest") - hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id) + hosts, err := v.networkTopology.FindProbedHosts(req.Host.GetId()) if err != nil { logger.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) @@ -714,12 +717,12 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error { continue } - if err := v.networkTopology.Store(req.Host.Id, probedHost.ID); err != nil { + if err := v.networkTopology.Store(req.Host.GetId(), probedHost.ID); err != nil { logger.Errorf("store failed: %s", err.Error()) continue } - if err := v.networkTopology.Probes(req.Host.Id, probe.Host.Id).Enqueue(&networktopology.Probe{ + if err := v.networkTopology.Probes(req.Host.GetId(), probe.Host.Id).Enqueue(&networktopology.Probe{ Host: probedHost, RTT: probe.Rtt.AsDuration(), CreatedAt: probe.CreatedAt.AsTime(), @@ -780,8 +783,8 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, // priority of the RegisterPeerTask parameter is // higher than parameter of the application. var priority commonv1.Priority - if req.UrlMeta.Priority != commonv1.Priority_LEVEL0 { - priority = req.UrlMeta.Priority + if req.UrlMeta.GetPriority() != commonv1.Priority_LEVEL0 { + priority = req.UrlMeta.GetPriority() } else { // Compatible with v1 version of priority enum. priority = types.PriorityV2ToV1(peer.CalculatePriority(dynconfig)) @@ -791,13 +794,13 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, switch priority { case commonv1.Priority_LEVEL6, commonv1.Priority_LEVEL0: if v.config.SeedPeer.Enable && !task.IsSeedPeerFailed() { - if len(req.UrlMeta.Range) > 0 { - if rg, err := http.ParseURLMetaRange(req.UrlMeta.Range, math.MaxInt64); err == nil { + if len(req.UrlMeta.GetRange()) > 0 { + if rg, err := http.ParseURLMetaRange(req.UrlMeta.GetRange(), math.MaxInt64); err == nil { go v.triggerSeedPeerTask(ctx, &rg, task) return nil } - peer.Log.Errorf("range %s is invalid", req.UrlMeta.Range) + peer.Log.Errorf("range %s is invalid", req.UrlMeta.GetRange()) } else { go v.triggerSeedPeerTask(ctx, nil, task) return nil @@ -844,17 +847,17 @@ func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *reso // storeTask stores a new task or reuses a previous task. func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task { - filters := strings.Split(req.UrlMeta.Filter, idgen.URLFilterSeparator) + filters := strings.Split(req.UrlMeta.GetFilter(), idgen.URLFilterSeparator) - task, loaded := v.resource.TaskManager().Load(req.TaskId) + task, loaded := v.resource.TaskManager().Load(req.GetTaskId()) if !loaded { options := []resource.TaskOption{} - if d, err := digest.Parse(req.UrlMeta.Digest); err == nil { + if d, err := digest.Parse(req.UrlMeta.GetDigest()); err == nil { options = append(options, resource.WithDigest(d)) } - task := resource.NewTask(req.TaskId, req.Url, req.UrlMeta.Tag, req.UrlMeta.Application, - typ, filters, req.UrlMeta.Header, int32(v.config.Scheduler.BackToSourceCount), options...) + task := resource.NewTask(req.GetTaskId(), req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), + typ, filters, req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) v.resource.TaskManager().Store(task) task.Log.Info("create new task") return task @@ -862,9 +865,9 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty // Task is the pointer, if the task already exists, the next request will // update the task's Url and UrlMeta in task manager. - task.URL = req.Url + task.URL = req.GetUrl() task.Filters = filters - task.Header = req.UrlMeta.Header + task.Header = req.UrlMeta.GetHeader() task.Log.Info("task already exists") return task } @@ -1275,8 +1278,8 @@ func (v *V1) handleTaskSuccess(ctx context.Context, task *resource.Task, req *sc } // Update task total piece count and content length. - task.TotalPieceCount.Store(req.TotalPieceCount) - task.ContentLength.Store(req.ContentLength) + task.TotalPieceCount.Store(req.GetTotalPieceCount()) + task.ContentLength.Store(req.GetContentLength()) if err := task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { task.Log.Errorf("task fsm event failed: %s", err.Error()) @@ -1488,6 +1491,7 @@ func (v *V1) createDownloadRecord(peer *resource.Peer, parents []*resource.Peer, ConcurrentUploadCount: peer.Host.ConcurrentUploadCount.Load(), UploadCount: peer.Host.UploadCount.Load(), UploadFailedCount: peer.Host.UploadFailedCount.Load(), + SchedulerClusterID: int64(peer.Host.SchedulerClusterID), CreatedAt: peer.Host.CreatedAt.Load().UnixNano(), UpdatedAt: peer.Host.UpdatedAt.Load().UnixNano(), }, @@ -1546,9 +1550,9 @@ func (v *V1) createDownloadRecord(peer *resource.Peer, parents []*resource.Peer, Platform: peer.Host.Build.Platform, } - if req.Code != commonv1.Code_Success { + if req.GetCode() != commonv1.Code_Success { download.Error = storage.Error{ - Code: req.Code.String(), + Code: req.GetCode().String(), } } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 006113d8be1..45745e05843 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -536,6 +536,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ })) } + if req.Host.GetSchedulerClusterId() != 0 { + options = append(options, resource.WithSchedulerClusterID(req.Host.GetSchedulerClusterId())) + } + host = resource.NewHost( req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()), diff --git a/scheduler/storage/storage_test.go b/scheduler/storage/storage_test.go index 4261b4f0d68..73ec305e1a3 100644 --- a/scheduler/storage/storage_test.go +++ b/scheduler/storage/storage_test.go @@ -112,8 +112,9 @@ var ( GoVersion: "1.19", Platform: "linux", }, - CreatedAt: time.Now().UnixNano(), - UpdatedAt: time.Now().UnixNano(), + SchedulerClusterID: 1, + CreatedAt: time.Now().UnixNano(), + UpdatedAt: time.Now().UnixNano(), } mockPiece = Piece{ diff --git a/scheduler/storage/types.go b/scheduler/storage/types.go index ffa8ee637ab..da3cd1bb165 100644 --- a/scheduler/storage/types.go +++ b/scheduler/storage/types.go @@ -117,6 +117,9 @@ type Host struct { // Build information. Build resource.Build `csv:"build"` + // SchedulerClusterID is scheduler cluster id. + SchedulerClusterID int64 `csv:"schedulerClusterId"` + // CreatedAt is peer create nanosecond time. CreatedAt int64 `csv:"createdAt"` diff --git a/trainer/storage/testdata/download.csv b/trainer/storage/testdata/download.csv index 4424870655e..bbc6d023ff7 100644 --- a/trainer/storage/testdata/download.csv +++ b/trainer/storage/testdata/download.csv @@ -1 +1 @@ -5,d,mq,Succeeded,unknow,unknow,1000,10,1,example.com,normal,2048,1,10,2,Succeeded,1689685929626279983,1689685929626280083,2,normal,localhost,127.0.0.1,8080,8081,linux,ubuntu,debian,1.0.0,1.0.0,100,40,20,3,24,12,0.8,0.4,100,101,102,103,104,105,106,107,108,109,20,19,16,0.7,0.2,15,400,200,china,e1,100,88,56,0.9,200,180,160,0.6,3.0.0,2bf4d5e,1.19,linux,1689685929626280163,1689685929626284802,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,,,,,0,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0,4,m,db,Succeeded,1000,10,10,2,normal,localhost,127.0.0.1,8080,8081,linux,ubuntu,debian,1.0.0,1.0.0,100,40,20,3,24,12,0.8,0.4,100,101,102,103,104,105,106,107,108,109,20,19,16,0.7,0.2,15,400,200,china,e1,100,88,56,0.9,200,180,160,0.6,3.0.0,2bf4d5e,1.19,linux,1689685929626280163,1689685929626284802,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,20,10,1689685929626284872,1689685929626286204,1689685929626286274,1689685929626310119,1689685929626310189 +,,,,,,0,0,,,,0,0,0,0,,0,0,,,,,0,0,,,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,0,0,0,0,0,0,0,0,,,,,0,0,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0,0