From 4c533395e18098c741fa9f7bcc1a1bd3f1944407 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 11 Feb 2025 13:58:41 +0100 Subject: [PATCH] Scheduler: Resync reserved periodically to keep state consistent Add resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership changes (Promote / Demote). `initReserved` is not enough since the vPod lister can be stale. Signed-off-by: Pierangelo Di Pilato --- pkg/scheduler/scheduler.go | 2 + pkg/scheduler/statefulset/scheduler.go | 64 ++++++++++++++++++++++++++ pkg/scheduler/testing/vpod.go | 4 ++ 3 files changed, 70 insertions(+) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0dd4f2b6c8c..d682b9acfdd 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1. // VPod represents virtual replicas placed into real Kubernetes pods // The scheduler is responsible for placing VPods type VPod interface { + GetDeletionTimestamp() *metav1.Time + // GetKey returns the VPod key (namespace/name). GetKey() types.NamespacedName diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index cf2834e7c34..13b1887481c 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -21,6 +21,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "time" "go.uber.org/zap" @@ -114,6 +115,11 @@ type StatefulSetScheduler struct { // replicas is the (cached) number of statefulset replicas. replicas int32 + // isLeader signals whether a given Scheduler instance is leader or not. + // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a + // bucket where we've been promoted. + isLeader atomic.Bool + // reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been // committed yet (ie. not appearing in vpodLister) reserved map[types.NamespacedName]map[string]int32 @@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler. if !b.Has(ephemeralLeaderElectionObject) { return nil } + // The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore. + // Flip the flag after running initReserved. + defer s.isLeader.Store(true) if v, ok := s.autoscaler.(reconciler.LeaderAware); ok { return v.Promote(b, enq) @@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error { s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods)) for _, vPod := range vPods { + if !vPod.GetDeletionTimestamp().IsZero() { + continue + } s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements())) for _, placement := range vPod.GetPlacements() { s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas @@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error { return nil } +// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership +// changes (Promote / Demote). +// initReserved is not enough since the vPod lister can be stale. +func (s *StatefulSetScheduler) resyncReserved() error { + if !s.isLeader.Load() { + return nil + } + + vPods, err := s.vpodLister() + if err != nil { + return fmt.Errorf("failed to list vPods during reserved resync: %w", err) + } + vPodsByK := vPodsByKey(vPods) + + s.reservedMu.Lock() + defer s.reservedMu.Unlock() + + for key := range s.reserved { + vPod, ok := vPodsByK[key] + if !ok || vPod == nil { + delete(s.reserved, key) + } + } + + return nil +} + // Demote implements reconciler.LeaderAware. func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) { + if !b.Has(ephemeralLeaderElectionObject) { + return + } + // The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore. + defer s.isLeader.Store(false) + if v, ok := s.autoscaler.(reconciler.LeaderAware); ok { v.Demote(b) } @@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context, sif.Shutdown() }() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(cfg.RefreshPeriod * 3): + _ = s.resyncReserved() + } + } + }() + return s } @@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha } return placements } + +func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod { + r := make(map[types.NamespacedName]scheduler.VPod, len(vPods)) + for _, vPod := range vPods { + r[vPod.GetKey()] = vPod + } + return r +} diff --git a/pkg/scheduler/testing/vpod.go b/pkg/scheduler/testing/vpod.go index f11dda3e488..52c96bb95cf 100644 --- a/pkg/scheduler/testing/vpod.go +++ b/pkg/scheduler/testing/vpod.go @@ -57,6 +57,10 @@ func NewVPod(ns, name string, vreplicas int32, placements []duckv1alpha1.Placeme } } +func (d *sampleVPod) GetDeletionTimestamp() *metav1.Time { + return nil +} + func (d *sampleVPod) GetKey() types.NamespacedName { return d.key }