diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index 50a28ae7fb3..cb0a0d25495 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -23,14 +23,20 @@ import ( "strconv" "time" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" logger "d7y.io/dragonfly/v2/internal/dflog" + pkggc "d7y.io/dragonfly/v2/pkg/gc" pkgredis "d7y.io/dragonfly/v2/pkg/redis" pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" ) +const ( + // GC persistent cache host id. + GCHostID = "persistent-cache-host" +) + // HostManager is the interface used for host manager. type HostManager interface { // Load returns host by a key. @@ -44,6 +50,9 @@ type HostManager interface { // LoadAll returns all hosts. LoadAll(context.Context) ([]*Host, error) + + // RunGC runs garbage collection. + RunGC() error } // hostManager contains content for host manager. @@ -56,14 +65,25 @@ type hostManager struct { } // New host manager interface. -func newHostManager(cfg *config.Config, rdb redis.UniversalClient) HostManager { - return &hostManager{config: cfg, rdb: rdb} +func newHostManager(cfg *config.Config, gc pkggc.GC, rdb redis.UniversalClient) (HostManager, error) { + h := &hostManager{config: cfg, rdb: rdb} + + if err := gc.Add(pkggc.Task{ + ID: GCHostID, + Interval: cfg.Scheduler.GC.HostGCInterval, + Timeout: cfg.Scheduler.GC.HostGCInterval, + Runner: h, + }); err != nil { + return nil, err + } + + return h, nil } // Load returns host by a key. -func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { +func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { log := logger.WithHostID(hostID) - rawHost, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result() + rawHost, err := h.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result() if err != nil { log.Errorf("getting host failed from redis: %v", err) return nil, false @@ -427,9 +447,9 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { } // Store sets host. -func (t *hostManager) Store(ctx context.Context, host *Host) error { - _, err := t.rdb.HSet(ctx, - pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, host.ID), +func (h *hostManager) Store(ctx context.Context, host *Host) error { + _, err := h.rdb.HSet(ctx, + pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID), "id", host.ID, "type", host.Type.Name(), "hostname", host.Hostname, @@ -494,13 +514,13 @@ func (t *hostManager) Store(ctx context.Context, host *Host) error { } // Delete deletes host by a key. -func (t *hostManager) Delete(ctx context.Context, hostID string) error { - _, err := t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result() +func (h *hostManager) Delete(ctx context.Context, hostID string) error { + _, err := h.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result() return err } // LoadAll returns all hosts. -func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { +func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { var ( hosts []*Host cursor uint64 @@ -512,14 +532,14 @@ func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { err error ) - hostKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(t.config.Manager.SchedulerClusterID), 10).Result() + hostKeys, cursor, err = h.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), 10).Result() if err != nil { logger.Error("scan hosts failed") return nil, err } for _, hostKey := range hostKeys { - host, loaded := t.Load(ctx, hostKey) + host, loaded := h.Load(ctx, hostKey) if !loaded { logger.WithHostID(hostKey).Error("load host failed") continue @@ -535,3 +555,26 @@ func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { return hosts, nil } + +// RunGC runs garbage collection. +func (h *hostManager) RunGC() error { + hosts, err := h.LoadAll(context.Background()) + if err != nil { + logger.Error("load all hosts failed") + return err + } + + for _, host := range hosts { + // If the host's elapsed exceeds twice the announcing interval, + // then leave peers in host. + elapsed := time.Since(host.UpdatedAt) + if host.AnnounceInterval > 0 && elapsed > host.AnnounceInterval*2 { + host.Log.Info("host has been reclaimed") + if err := h.Delete(context.Background(), host.ID); err != nil { + host.Log.Errorf("delete host failed: %v", err) + } + } + } + + return nil +} diff --git a/scheduler/resource/persistentcache/host_manager_mock.go b/scheduler/resource/persistentcache/host_manager_mock.go index 68cee0e56ed..ac3a755fe2f 100644 --- a/scheduler/resource/persistentcache/host_manager_mock.go +++ b/scheduler/resource/persistentcache/host_manager_mock.go @@ -84,6 +84,20 @@ func (mr *MockHostManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockHostManager)(nil).LoadAll), arg0) } +// RunGC mocks base method. +func (m *MockHostManager) RunGC() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RunGC") + ret0, _ := ret[0].(error) + return ret0 +} + +// RunGC indicates an expected call of RunGC. +func (mr *MockHostManagerMockRecorder) RunGC() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockHostManager)(nil).RunGC)) +} + // Store mocks base method. func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) error { m.ctrl.T.Helper() diff --git a/scheduler/resource/persistentcache/peer_manager.go b/scheduler/resource/persistentcache/peer_manager.go index c6471f5044f..7979383cdfe 100644 --- a/scheduler/resource/persistentcache/peer_manager.go +++ b/scheduler/resource/persistentcache/peer_manager.go @@ -58,6 +58,12 @@ type PeerManager interface { // DeleteAllByTaskID deletes all peers by task id. DeleteAllByTaskID(context.Context, string) error + + // LoadAllByHostID returns all peers by host id. + LoadAllByHostID(context.Context, string) ([]*Peer, error) + + // DeleteAllByHostID deletes all peers by host id. + DeleteAllByHostID(context.Context, string) error } // peerManager contains content for peer manager. @@ -366,6 +372,59 @@ func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) erro } } - p.taskManager.Delete(ctx, taskID) + return nil +} + +// LoadAllByHostID returns all persistent cache peers by host id. +func (p *peerManager) LoadAllByHostID(ctx context.Context, hostID string) ([]*Peer, error) { + log := logger.WithHostID(hostID) + peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, hostID)).Result() + if err != nil { + log.Error("get peer ids failed") + return nil, err + } + + peers := make([]*Peer, 0, len(peerIDs)) + for _, peerID := range peerIDs { + peer, loaded := p.Load(ctx, peerID) + if !loaded { + log.Errorf("load peer %s failed", peerID) + continue + } + + peers = append(peers, peer) + } + + return peers, nil +} + +// DeleteAllByHostID deletes all persistent cache peers by host id. +func (p *peerManager) DeleteAllByHostID(ctx context.Context, hostID string) error { + log := logger.WithTaskID(hostID) + peers, err := p.LoadAllByHostID(ctx, hostID) + if err != nil { + log.Error("load peers failed") + return err + } + + for _, peer := range peers { + addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) + client, err := dfdaemonclient.GetV2ByAddr(ctx, addr, grpc.WithTransportCredentials(p.transportCredentials)) + if err != nil { + log.Errorf("get dfdaemon client failed: %v", err) + continue + } + + if err := client.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil { + log.Errorf("delete task %s failed", peer.Task.ID) + continue + } + + if err := p.Delete(ctx, peer.ID); err != nil { + log.Errorf("delete peer %s failed", peer.ID) + continue + } + } + return nil } diff --git a/scheduler/resource/persistentcache/peer_manager_mock.go b/scheduler/resource/persistentcache/peer_manager_mock.go index 4231d7e5ec1..fe3634bbb01 100644 --- a/scheduler/resource/persistentcache/peer_manager_mock.go +++ b/scheduler/resource/persistentcache/peer_manager_mock.go @@ -54,6 +54,20 @@ func (mr *MockPeerManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockPeerManager)(nil).Delete), arg0, arg1) } +// DeleteAllByHostID mocks base method. +func (m *MockPeerManager) DeleteAllByHostID(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteAllByHostID", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteAllByHostID indicates an expected call of DeleteAllByHostID. +func (mr *MockPeerManagerMockRecorder) DeleteAllByHostID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAllByHostID", reflect.TypeOf((*MockPeerManager)(nil).DeleteAllByHostID), arg0, arg1) +} + // DeleteAllByTaskID mocks base method. func (m *MockPeerManager) DeleteAllByTaskID(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() @@ -98,6 +112,21 @@ func (mr *MockPeerManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockPeerManager)(nil).LoadAll), arg0) } +// LoadAllByHostID mocks base method. +func (m *MockPeerManager) LoadAllByHostID(arg0 context.Context, arg1 string) ([]*Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadAllByHostID", arg0, arg1) + ret0, _ := ret[0].([]*Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadAllByHostID indicates an expected call of LoadAllByHostID. +func (mr *MockPeerManagerMockRecorder) LoadAllByHostID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAllByHostID", reflect.TypeOf((*MockPeerManager)(nil).LoadAllByHostID), arg0, arg1) +} + // LoadAllByTaskID mocks base method. func (m *MockPeerManager) LoadAllByTaskID(arg0 context.Context, arg1 string) ([]*Peer, error) { m.ctrl.T.Helper() diff --git a/scheduler/resource/persistentcache/resource.go b/scheduler/resource/persistentcache/resource.go index d1c0b0705a3..ec56cef81bc 100644 --- a/scheduler/resource/persistentcache/resource.go +++ b/scheduler/resource/persistentcache/resource.go @@ -22,6 +22,8 @@ import ( redis "github.com/redis/go-redis/v9" "google.golang.org/grpc/credentials" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -50,11 +52,16 @@ type resource struct { } // New returns Resource interface. -func New(cfg *config.Config, rdb redis.UniversalClient, transportCredentials credentials.TransportCredentials) Resource { +func New(cfg *config.Config, gc gc.GC, rdb redis.UniversalClient, transportCredentials credentials.TransportCredentials) (Resource, error) { taskManager := newTaskManager(cfg, rdb) - hostManager := newHostManager(cfg, rdb) + hostManager, err := newHostManager(cfg, gc, rdb) + if err != nil { + logger.Errorf("failed to create host manager: %v", err) + return nil, err + } + peerManager := newPeerManager(cfg, rdb, taskManager, hostManager, transportCredentials) - return &resource{peerManager, taskManager, hostManager} + return &resource{peerManager, taskManager, hostManager}, nil } // Host manager interface. diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 39bd529582e..493089c1fed 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -193,7 +193,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } // Initialize persistent cache resource. - s.persistentCacheResource = persistentcache.New(cfg, rdb, peerClientTransportCredentials) + s.persistentCacheResource, err = persistentcache.New(cfg, s.gc, rdb, peerClientTransportCredentials) + if err != nil { + logger.Errorf("failed to create persistent cache resource: %v", err) + return nil, err + } // Initialize job service. if cfg.Job.Enable && rdb != nil {