diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index 6601e59dbf..a898f5c361 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -498,6 +498,12 @@ spec: - type type: object type: array + nodeClassObservedGeneration: + description: |- + NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match + the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset + format: int64 + type: integer resources: additionalProperties: anyOf: diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index 157aaf13c4..725b953af7 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -496,6 +496,12 @@ spec: - type type: object type: array + nodeClassObservedGeneration: + description: |- + NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match + the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset + format: int64 + type: integer resources: additionalProperties: anyOf: diff --git a/pkg/apis/v1/nodepool_status.go b/pkg/apis/v1/nodepool_status.go index 1b3f974694..8fe17c376b 100644 --- a/pkg/apis/v1/nodepool_status.go +++ b/pkg/apis/v1/nodepool_status.go @@ -27,6 +27,8 @@ const ( ConditionTypeValidationSucceeded = "ValidationSucceeded" // ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready ConditionTypeNodeClassReady = "NodeClassReady" + // ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation + ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" ) // NodePoolStatus defines the observed state of NodePool @@ -34,6 +36,10 @@ type NodePoolStatus struct { // Resources is the list of resources that have been provisioned. // +optional Resources v1.ResourceList `json:"resources,omitempty"` + // NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match + // the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset + // +optional + NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"` // Conditions contains signals for health and readiness // +optional Conditions []status.Condition `json:"conditions,omitempty"` diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 682bf172fd..544def960e 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -50,6 +50,7 @@ import ( nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter" nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash" nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness" + nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth" nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/state" @@ -88,6 +89,7 @@ func NewControllers( metricsnodepool.NewController(kubeClient, cloudProvider), metricsnode.NewController(cluster), nodepoolreadiness.NewController(kubeClient, cloudProvider), + nodepoolregistrationhealth.NewController(kubeClient, cloudProvider), nodepoolcounter.NewController(kubeClient, cloudProvider, cluster), nodepoolvalidation.NewController(kubeClient, cloudProvider), podevents.NewController(clock, kubeClient, cloudProvider), diff --git a/pkg/controllers/metrics/pod/controller.go b/pkg/controllers/metrics/pod/controller.go index 8b14ef067c..fee6579171 100644 --- a/pkg/controllers/metrics/pod/controller.go +++ b/pkg/controllers/metrics/pod/controller.go @@ -252,7 +252,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco }) c.recordPodSchedulingUndecidedMetric(pod) // Get the time for when we Karpenter first thought the pod was schedulable. This should be zero if we didn't simulate for this pod. - schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}) + schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, false) c.recordPodStartupMetric(pod, schedulableTime) c.recordPodBoundMetric(pod, schedulableTime) // Requeue every 30s for pods that are stuck without a state change diff --git a/pkg/controllers/metrics/pod/suite_test.go b/pkg/controllers/metrics/pod/suite_test.go index a69794e22f..2ee4021e07 100644 --- a/pkg/controllers/metrics/pod/suite_test.go +++ b/pkg/controllers/metrics/pod/suite_test.go @@ -105,7 +105,7 @@ var _ = Describe("Pod Metrics", func() { p.Status.Phase = corev1.PodPending fakeClock.Step(1 * time.Hour) - cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p) // PodScheduled condition does not exist, emit pods_unbound_time_seconds metric ExpectApplied(ctx, env.Client, p) @@ -182,7 +182,7 @@ var _ = Describe("Pod Metrics", func() { p.Status.Phase = corev1.PodPending fakeClock.Step(1 * time.Hour) - cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p) ExpectApplied(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set _, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{ @@ -271,7 +271,7 @@ var _ = Describe("Pod Metrics", func() { }) Expect(found).To(BeTrue()) - cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) _, found = FindMetricWithLabelValues("karpenter_pods_scheduling_undecided_time_seconds", map[string]string{ @@ -318,7 +318,7 @@ var _ = Describe("Pod Metrics", func() { p.Status.Phase = corev1.PodPending ExpectApplied(ctx, env.Client, p) - cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) _, found := FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ diff --git a/pkg/controllers/nodeclaim/lifecycle/liveness.go b/pkg/controllers/nodeclaim/lifecycle/liveness.go index fc1a272752..d98b375bc9 100644 --- a/pkg/controllers/nodeclaim/lifecycle/liveness.go +++ b/pkg/controllers/nodeclaim/lifecycle/liveness.go @@ -20,9 +20,14 @@ import ( "context" "time" + "k8s.io/apimachinery/pkg/api/errors" + + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/controller-runtime/pkg/log" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 { return reconcile.Result{RequeueAfter: ttl}, nil } + if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } // Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey], metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey], }) - return reconcile.Result{}, nil } + +// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False +// on the NodePool if the nodeClaim fails to launch/register +func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error { + nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey] + if ok && len(nodePoolName) != 0 { + nodePool := &v1.NodePool{} + if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil { + return err + } + if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() { + stored := nodePool.DeepCopy() + // If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on + // NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim. + if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node") + } else { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message) + } + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + return err + } + } + } + return nil +} diff --git a/pkg/controllers/nodeclaim/lifecycle/liveness_test.go b/pkg/controllers/nodeclaim/lifecycle/liveness_test.go index 8fe3421782..411cf43a7d 100644 --- a/pkg/controllers/nodeclaim/lifecycle/liveness_test.go +++ b/pkg/controllers/nodeclaim/lifecycle/liveness_test.go @@ -19,6 +19,9 @@ package lifecycle_test import ( "time" + "github.com/awslabs/operatorpkg/status" + + operatorpkg "github.com/awslabs/operatorpkg/test/expectations" . "github.com/onsi/ginkgo/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() { ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) if isManagedNodeClaim { ExpectNotFound(ctx, env.Client, nodeClaim) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionFalse, + Reason: "RegistrationFailed", + Message: "Failed to register node", + }) } else { ExpectExists(ctx, env.Client, nodeClaim) } @@ -141,6 +150,45 @@ var _ = Describe("Liveness", func() { // If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim fakeClock.Step(time.Minute * 20) _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionFalse, + Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason, + Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message, + }) + ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) + ExpectNotFound(ctx, env.Client, nodeClaim) + }) + It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() { + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + nodeClaim := test.NodeClaim(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + }, + }, + Spec: v1.NodeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("50Mi"), + corev1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("1"), + }, + }, + }, + }) + cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + + // If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim + fakeClock.Step(time.Minute * 20) + _ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim) + + // NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue}) ExpectFinalizersRemoved(ctx, env.Client, nodeClaim) ExpectNotFound(ctx, env.Client, nodeClaim) }) diff --git a/pkg/controllers/nodeclaim/lifecycle/registration.go b/pkg/controllers/nodeclaim/lifecycle/registration.go index 0cbcaf156e..6c19bab9c5 100644 --- a/pkg/controllers/nodeclaim/lifecycle/registration.go +++ b/pkg/controllers/nodeclaim/lifecycle/registration.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "k8s.io/apimachinery/pkg/types" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -82,9 +84,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) ( metrics.NodesCreatedTotal.Inc(map[string]string{ metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey], }) + if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } return reconcile.Result{}, nil } +// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True +// on the NodePool if the nodeClaim that registered is owned by a NodePool +func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error { + nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey] + if ok && len(nodePoolName) != 0 { + nodePool := &v1.NodePool{} + if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil { + return err + } + storedNodePool := nodePool.DeepCopy() + if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + return err + } + } + } + return nil +} + func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error { stored := node.DeepCopy() controllerutil.AddFinalizer(node, v1.TerminationFinalizer) diff --git a/pkg/controllers/nodeclaim/lifecycle/registration_test.go b/pkg/controllers/nodeclaim/lifecycle/registration_test.go index 9f9b93459e..d2714775ba 100644 --- a/pkg/controllers/nodeclaim/lifecycle/registration_test.go +++ b/pkg/controllers/nodeclaim/lifecycle/registration_test.go @@ -17,6 +17,10 @@ limitations under the License. package lifecycle_test import ( + "time" + + "github.com/awslabs/operatorpkg/status" + operatorpkg "github.com/awslabs/operatorpkg/test/expectations" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -54,6 +58,7 @@ var _ = Describe("Registration", func() { }) } nodeClaim := test.NodeClaim(nodeClaimOpts...) + nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy) ExpectApplied(ctx, env.Client, nodePool, nodeClaim) ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) @@ -66,6 +71,10 @@ var _ = Describe("Registration", func() { if isManagedNodeClaim { Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue()) Expect(nodeClaim.Status.NodeName).To(Equal(node.Name)) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionTrue, + }) } else { Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsUnknown()).To(BeTrue()) Expect(nodeClaim.Status.NodeName).To(Equal("")) @@ -368,4 +377,30 @@ var _ = Describe("Registration", func() { node = ExpectExists(ctx, env.Client, node) Expect(node.Spec.Taints).To(HaveLen(0)) }) + It("should add NodeRegistrationHealthy=true on the nodePool if registration succeeds and if it was previously false", func() { + nodeClaimOpts := []v1.NodeClaim{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + }, + }, + }} + nodeClaim := test.NodeClaim(nodeClaimOpts...) + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + + node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}}) + ExpectApplied(ctx, env.Client, node) + ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue()) + Expect(nodeClaim.Status.NodeName).To(Equal(node.Name)) + operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{ + Type: v1.ConditionTypeNodeRegistrationHealthy, + Status: metav1.ConditionTrue, + }) + }) }) diff --git a/pkg/controllers/nodepool/readiness/controller.go b/pkg/controllers/nodepool/readiness/controller.go index 7ec99f3d07..a2cbeb1cb3 100644 --- a/pkg/controllers/nodepool/readiness/controller.go +++ b/pkg/controllers/nodepool/readiness/controller.go @@ -19,9 +19,7 @@ package readiness import ( "context" - "github.com/awslabs/operatorpkg/object" "github.com/awslabs/operatorpkg/status" - "github.com/samber/lo" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" controllerruntime "sigs.k8s.io/controller-runtime" @@ -55,11 +53,8 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco ctx = injection.WithControllerName(ctx, "nodepool.readiness") stored := nodePool.DeepCopy() - nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool { - return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind() - }) - if !ok { - // Ignore NodePools which aren't using a supported NodeClass. + nodeClass := nodepoolutils.GetNodeClassStatusObject(nodePool, c.cloudProvider) + if nodeClass == nil { return reconcile.Result{}, nil } diff --git a/pkg/controllers/nodepool/registrationhealth/controller.go b/pkg/controllers/nodepool/registrationhealth/controller.go new file mode 100644 index 0000000000..db2d6f04d7 --- /dev/null +++ b/pkg/controllers/nodepool/registrationhealth/controller.go @@ -0,0 +1,94 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrationhealth + +import ( + "context" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + + "sigs.k8s.io/karpenter/pkg/operator/injection" + + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool" +) + +// Controller for the resource +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +// NewController will create a controller to reset NodePool's registration health when there is an update to NodePool/NodeClass spec +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller { + return &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + } +} + +func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "nodepool.registrationhealth") + + nodeClass := nodepoolutils.GetNodeClassStatusObject(nodePool, c.cloudProvider) + if nodeClass == nil { + return reconcile.Result{}, nil + } + if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + stored := nodePool.DeepCopy() + + // If NodeClass/NodePool have been updated then NodeRegistrationHealthy = Unknown + if (nodePool.Status.NodeClassObservedGeneration != nodeClass.GetGeneration()) || + (nodePool.Generation != nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).ObservedGeneration) { + nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy) + } + nodePool.Status.NodeClassObservedGeneration = nodeClass.GetGeneration() + if !equality.Semantic.DeepEqual(stored, nodePool) { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, err + } + } + return reconcile.Result{}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + b := controllerruntime.NewControllerManagedBy(m). + Named("nodepool.registrationhealth"). + For(&v1.NodePool{}, builder.WithPredicates(nodepoolutils.IsManagedPredicateFuncs(c.cloudProvider))). + WithOptions(controller.Options{MaxConcurrentReconciles: 10}) + for _, nodeClass := range c.cloudProvider.GetSupportedNodeClasses() { + b.Watches(nodeClass, nodepoolutils.NodeClassEventHandler(c.kubeClient)) + } + return b.Complete(reconcile.AsReconciler(m.GetClient(), c)) +} diff --git a/pkg/controllers/nodepool/registrationhealth/suite_test.go b/pkg/controllers/nodepool/registrationhealth/suite_test.go new file mode 100644 index 0000000000..749cecfc5e --- /dev/null +++ b/pkg/controllers/nodepool/registrationhealth/suite_test.go @@ -0,0 +1,131 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrationhealth_test + +import ( + "context" + "testing" + + "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth" + + "github.com/awslabs/operatorpkg/object" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/karpenter/pkg/apis" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ( + controller *registrationhealth.Controller + ctx context.Context + env *test.Environment + cloudProvider *fake.CloudProvider + nodePool *v1.NodePool + nodeClass *v1alpha1.TestNodeClass +) + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "RegistrationHealth") +} + +var _ = BeforeSuite(func() { + cloudProvider = fake.NewCloudProvider() + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...)) + controller = registrationhealth.NewController(env.Client, cloudProvider) +}) +var _ = AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = Describe("RegistrationHealth", func() { + BeforeEach(func() { + nodePool = test.NodePool() + nodeClass = test.NodeClass(v1alpha1.TestNodeClass{ + ObjectMeta: metav1.ObjectMeta{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, + }) + nodePool.Spec.Template.Spec.NodeClassRef.Group = object.GVK(nodeClass).Group + nodePool.Spec.Template.Spec.NodeClassRef.Kind = object.GVK(nodeClass).Kind + _ = nodePool.StatusConditions().Clear(v1.ConditionTypeNodeRegistrationHealthy) + }) + It("should ignore setting NodeRegistrationHealthy status condition on NodePools which aren't managed by this instance of Karpenter", func() { + nodePool.Spec.Template.Spec.NodeClassRef = &v1.NodeClassReference{ + Group: "karpenter.test.sh", + Kind: "UnmanagedNodeClass", + Name: "default", + } + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy)).To(BeNil()) + }) + It("should not set NodeRegistrationHealthy status condition on nodePool when nodeClass does not exist", func() { + ExpectApplied(ctx, env.Client, nodePool) + ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy)).To(BeNil()) + }) + It("should set NodeRegistrationHealthy status condition on nodePool as Unknown if the nodeClass observed generation doesn't match with that on nodePool", func() { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + nodePool.Status.NodeClassObservedGeneration = int64(1) + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + + nodePool.Spec.Limits = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("14")} + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeTrue()) + Expect(nodePool.Status.NodeClassObservedGeneration).To(Equal(int64(1))) + }) + It("should set NodeRegistrationHealthy status condition on nodePool as Unknown if the nodePool is updated", func() { + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeTrue()) + Expect(nodePool.Status.NodeClassObservedGeneration).To(Equal(int64(1))) + }) + It("should not set NodeRegistrationHealthy status condition on nodePool as Unknown if it is already set to true", func() { + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + nodePool.Status.NodeClassObservedGeneration = int64(1) + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeFalse()) + }) + It("should not set NodeRegistrationHealthy status condition on nodePool as Unknown if it is already set to false", func() { + nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy") + nodePool.Status.NodeClassObservedGeneration = int64(1) + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + _ = ExpectObjectReconciled(ctx, env.Client, controller, nodePool) + nodePool = ExpectExists(ctx, env.Client, nodePool) + Expect(nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown()).To(BeFalse()) + }) +}) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index ef2b5b52f5..c813a60fa9 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -165,7 +165,7 @@ func (p *Provisioner) GetPendingPods(ctx context.Context) ([]*corev1.Pod, error) rejectedPods, pods := lo.FilterReject(pods, func(po *corev1.Pod, _ int) bool { if err := p.Validate(ctx, po); err != nil { // Mark in memory that this pod is unschedulable - p.cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{po: fmt.Errorf("ignoring pod, %w", err)}, po) + p.cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{po: fmt.Errorf("ignoring pod, %w", err)}, po) log.FromContext(ctx).WithValues("Pod", klog.KRef(po.Namespace, po.Name)).V(1).Info(fmt.Sprintf("ignoring pod, %s", err)) return true } @@ -304,7 +304,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { if errors.Is(err, ErrNodePoolsNotFound) { log.FromContext(ctx).Info("no nodepools found") // Mark in memory that these pods are unschedulable - p.cluster.MarkPodSchedulingDecisions(lo.SliceToMap(pods, func(p *corev1.Pod) (*corev1.Pod, error) { + p.cluster.MarkPodSchedulingDecisions(ctx, lo.SliceToMap(pods, func(p *corev1.Pod) (*corev1.Pod, error) { return p, fmt.Errorf("no nodepools found") }), pods...) return scheduler.Results{}, nil @@ -317,7 +317,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)") } // Mark in memory when these pods were marked as schedulable or when we made a decision on the pods - p.cluster.MarkPodSchedulingDecisions(results.PodErrors, pendingPods...) + p.cluster.MarkPodSchedulingDecisions(ctx, results.PodErrors, pendingPods...) results.Record(ctx, p.recorder, p.cluster) return results, nil } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 398942c536..1919beef45 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -302,6 +302,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { // Pick existing node that we are about to create for _, nodeClaim := range s.newNodeClaims { if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err == nil { + s.cluster.MarkPodToNodePoolSchedulingDecision(pod, nodeClaim.Labels[v1.NodePoolLabelKey]) return nil } } @@ -333,6 +334,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { // we will launch this nodeClaim and need to track its maximum possible resource usage against our remaining resources s.newNodeClaims = append(s.newNodeClaims, nodeClaim) s.remainingResources[nodeClaimTemplate.NodePoolName] = subtractMax(s.remainingResources[nodeClaimTemplate.NodePoolName], nodeClaim.InstanceTypeOptions) + s.cluster.MarkPodToNodePoolSchedulingDecision(pod, nodeClaim.Labels[v1.NodePoolLabelKey]) return nil } return errs diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 79054a1733..81bf4a6beb 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -406,7 +406,7 @@ var _ = Context("Scheduling", func() { cluster.AckPods(pod) ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) ExpectNotScheduled(ctx, env.Client, pod) - Expect(cluster.PodSchedulingSuccessTime(nn).IsZero()).To(BeTrue()) + Expect(cluster.PodSchedulingSuccessTime(nn, false).IsZero()).To(BeTrue()) Expect(cluster.PodSchedulingDecisionTime(nn).IsZero()).To(BeFalse()) ExpectMetricHistogramSampleCountValue("karpenter_pods_scheduling_decision_duration_seconds", 1, nil) } diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 1174ef973d..a47e6ed053 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -241,7 +241,7 @@ var _ = Describe("Provisioning", func() { // Provisioning should fail here since there are no valid nodePools to schedule the pod ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) ExpectNotScheduled(ctx, env.Client, pod) - Expect(cluster.PodSchedulingSuccessTime(nn).IsZero()).To(BeTrue()) + Expect(cluster.PodSchedulingSuccessTime(nn, false).IsZero()).To(BeTrue()) Expect(cluster.PodSchedulingDecisionTime(nn).IsZero()).To(BeFalse()) ExpectMetricHistogramSampleCountValue("karpenter_pods_scheduling_decision_duration_seconds", 1, nil) }) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index b1a7e8853f..d3c977936d 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -58,9 +58,11 @@ type Cluster struct { nodeClaimNameToProviderID map[string]string // node claim name -> provider id daemonSetPods sync.Map // daemonSet -> existing pod - podAcks sync.Map // pod namespaced name -> time when Karpenter first saw the pod as pending - podsSchedulingAttempted sync.Map // pod namespaced name -> time when Karpenter tried to schedule a pod - podsSchedulableTimes sync.Map // pod namespaced name -> time when it was first marked as able to fit to a node + podAcks sync.Map // pod namespaced name -> time when Karpenter first saw the pod as pending + podsSchedulingAttempted sync.Map // pod namespaced name -> time when Karpenter tried to schedule a pod + podsSchedulableTimes sync.Map // pod namespaced name -> time when it was first marked as able to fit to a node + podToNodePool sync.Map // pod namespaced name -> nodePool where it can be scheduled + podHealthyNodePoolScheduledTime sync.Map // pod namespaced name -> time when pod scheduled to a nodePool that has NodeRegistrationHealthy=true, is marked as able to fit to a node clusterStateMu sync.RWMutex // Separate mutex as this is called in some places that mu is held // A monotonically increasing timestamp representing the time state of the @@ -83,9 +85,12 @@ func NewCluster(clk clock.Clock, client client.Client, cloudProvider cloudprovid daemonSetPods: sync.Map{}, nodeNameToProviderID: map[string]string{}, nodeClaimNameToProviderID: map[string]string{}, - podAcks: sync.Map{}, - podsSchedulableTimes: sync.Map{}, - podsSchedulingAttempted: sync.Map{}, + + podAcks: sync.Map{}, + podsSchedulableTimes: sync.Map{}, + podsSchedulingAttempted: sync.Map{}, + podToNodePool: sync.Map{}, + podHealthyNodePoolScheduledTime: sync.Map{}, } } @@ -365,13 +370,28 @@ func (c *Cluster) PodAckTime(podKey types.NamespacedName) time.Time { // MarkPodSchedulingDecisions keeps track of when we first tried to schedule a pod to a node. // This also marks when the pod is first seen as schedulable for pod metrics. // We'll only emit a metric for a pod if we haven't done it before. -func (c *Cluster) MarkPodSchedulingDecisions(podErrors map[*corev1.Pod]error, pods ...*corev1.Pod) { +func (c *Cluster) MarkPodSchedulingDecisions(ctx context.Context, podErrors map[*corev1.Pod]error, pods ...*corev1.Pod) { now := c.clock.Now() for _, p := range pods { nn := client.ObjectKeyFromObject(p) // If there's no error for the pod, then we mark it as schedulable if err, ok := podErrors[p]; !ok || err == nil { c.podsSchedulableTimes.LoadOrStore(nn, now) + // If the pod is scheduled to a nodePool and if the nodePool has NodeRegistrationHealthy=true + // then mark the time when we thought it can schedule to now. + val, found := c.podToNodePool.Load(nn) + if found { + nodePool := &v1.NodePool{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: val.(string)}, nodePool); err == nil { + if nodePool.StatusConditions().IsTrue(v1.ConditionTypeNodeRegistrationHealthy) { + c.podHealthyNodePoolScheduledTime.Store(nn, now) + } else { + // If the pod was scheduled to a healthy nodePool earlier but is now getting scheduled to an + // unhealthy one then we need to delete its entry from the map because it will not schedule successfully + c.podHealthyNodePoolScheduledTime.Delete(nn) + } + } + } } _, alreadyExists := c.podsSchedulingAttempted.LoadOrStore(nn, now) // If we already attempted this, we don't need to emit another metric. @@ -384,6 +404,11 @@ func (c *Cluster) MarkPodSchedulingDecisions(podErrors map[*corev1.Pod]error, po } } +// MarkPodToNodePoolSchedulingDecision keeps track of the NodePool where we try to schedule a pod +func (c *Cluster) MarkPodToNodePoolSchedulingDecision(pod *corev1.Pod, nodePool string) { + c.podToNodePool.Store(client.ObjectKeyFromObject(pod), nodePool) +} + // PodSchedulingDecisionTime returns when Karpenter first decided if a pod could schedule a pod in scheduling simulations. // This returns 0, false if Karpenter never made a decision on the pod. func (c *Cluster) PodSchedulingDecisionTime(podKey types.NamespacedName) time.Time { @@ -395,9 +420,15 @@ func (c *Cluster) PodSchedulingDecisionTime(podKey types.NamespacedName) time.Ti // PodSchedulingSuccessTime returns when Karpenter first thought it could schedule a pod in its scheduling simulation. // This returns 0, false if the pod was never considered in scheduling as a pending pod. -func (c *Cluster) PodSchedulingSuccessTime(podKey types.NamespacedName) time.Time { - if val, found := c.podsSchedulableTimes.Load(podKey); found { - return val.(time.Time) +func (c *Cluster) PodSchedulingSuccessTime(podKey types.NamespacedName, registrationHealthyCheck bool) time.Time { + if registrationHealthyCheck { + if val, found := c.podHealthyNodePoolScheduledTime.Load(podKey); found { + return val.(time.Time) + } + } else { + if val, found := c.podsSchedulableTimes.Load(podKey); found { + return val.(time.Time) + } } return time.Time{} } @@ -416,6 +447,8 @@ func (c *Cluster) ClearPodSchedulingMappings(podKey types.NamespacedName) { c.podAcks.Delete(podKey) c.podsSchedulableTimes.Delete(podKey) c.podsSchedulingAttempted.Delete(podKey) + c.podToNodePool.Delete(podKey) + c.podHealthyNodePoolScheduledTime.Delete(podKey) } // MarkUnconsolidated marks the cluster state as being unconsolidated. This should be called in any situation where diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 6f2773221c..a032042ab2 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -99,6 +99,55 @@ var _ = AfterEach(func() { cluster.Reset() cloudProvider.Reset() }) +var _ = Describe("Pod Healthy NodePool", func() { + It("should not store pod schedulable time if the nodePool that pod is scheduled to does not have NodeRegistrationHealthy=true", func() { + pod := test.Pod() + ExpectApplied(ctx, env.Client, pod, nodePool) + nn := client.ObjectKeyFromObject(pod) + + cluster.MarkPodToNodePoolSchedulingDecision(pod, nodePool.Name) + setTime := cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeTrue()) + + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, pod) + setTime = cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeTrue()) + }) + It("should store pod schedulable time if the nodePool that pod is scheduled to has NodeRegistrationHealthy=true", func() { + pod := test.Pod() + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + ExpectApplied(ctx, env.Client, pod, nodePool) + nn := client.ObjectKeyFromObject(pod) + + cluster.MarkPodToNodePoolSchedulingDecision(pod, nodePool.Name) + setTime := cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeTrue()) + + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, pod) + setTime = cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeFalse()) + }) + It("should delete pod schedulable time if the pod was scheduled to a healthy nodePool but now is scheduled to an unhealthy nodePool", func() { + pod := test.Pod() + nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) + ExpectApplied(ctx, env.Client, pod, nodePool) + nn := client.ObjectKeyFromObject(pod) + + cluster.MarkPodToNodePoolSchedulingDecision(pod, nodePool.Name) + setTime := cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeTrue()) + + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, pod) + setTime = cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeFalse()) + + nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy) + ExpectApplied(ctx, env.Client, nodePool) + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, pod) + setTime = cluster.PodSchedulingSuccessTime(nn, true) + Expect(setTime.IsZero()).To(BeTrue()) + }) +}) var _ = Describe("Pod Ack", func() { It("should only mark pods as schedulable once", func() { @@ -106,14 +155,14 @@ var _ = Describe("Pod Ack", func() { ExpectApplied(ctx, env.Client, pod) nn := client.ObjectKeyFromObject(pod) - setTime := cluster.PodSchedulingSuccessTime(nn) + setTime := cluster.PodSchedulingSuccessTime(nn, false) Expect(setTime.IsZero()).To(BeTrue()) - cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, pod) - setTime = cluster.PodSchedulingSuccessTime(nn) + cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, pod) + setTime = cluster.PodSchedulingSuccessTime(nn, false) Expect(setTime.IsZero()).To(BeFalse()) - newTime := cluster.PodSchedulingSuccessTime(nn) + newTime := cluster.PodSchedulingSuccessTime(nn, false) Expect(newTime.Compare(setTime)).To(Equal(0)) }) }) diff --git a/pkg/utils/nodepool/nodepool.go b/pkg/utils/nodepool/nodepool.go index 298f133dbc..ef2f523175 100644 --- a/pkg/utils/nodepool/nodepool.go +++ b/pkg/utils/nodepool/nodepool.go @@ -39,6 +39,15 @@ func IsManaged(nodePool *v1.NodePool, cp cloudprovider.CloudProvider) bool { }) } +func GetNodeClassStatusObject(nodePool *v1.NodePool, cp cloudprovider.CloudProvider) status.Object { + if nodeClass, ok := lo.Find(cp.GetSupportedNodeClasses(), func(nodeClass status.Object) bool { + return object.GVK(nodeClass).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind() + }); ok { + return nodeClass + } + return nil +} + // IsManagedPredicateFuncs is used to filter controller-runtime NodeClaim watches to NodeClaims managed by the given cloudprovider. func IsManagedPredicateFuncs(cp cloudprovider.CloudProvider) predicate.Funcs { return predicate.NewPredicateFuncs(func(o client.Object) bool {