Skip to content

Commit

Permalink
feat: add NodeRegistrationHealthy status condition to nodepool
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Feb 20, 2025
1 parent 5687906 commit 810ad69
Show file tree
Hide file tree
Showing 20 changed files with 518 additions and 33 deletions.
6 changes: 6 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
// the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
// +optional
NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewControllers(
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolregistrationhealth.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
})
c.recordPodSchedulingUndecidedMetric(pod)
// Get the time for when we Karpenter first thought the pod was schedulable. This should be zero if we didn't simulate for this pod.
schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})
schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, false)
c.recordPodStartupMetric(pod, schedulableTime)
c.recordPodBoundMetric(pod, schedulableTime)
// Requeue every 30s for pods that are stuck without a state change
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/metrics/pod/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var _ = Describe("Pod Metrics", func() {
p.Status.Phase = corev1.PodPending

fakeClock.Step(1 * time.Hour)
cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)

// PodScheduled condition does not exist, emit pods_unbound_time_seconds metric
ExpectApplied(ctx, env.Client, p)
Expand Down Expand Up @@ -182,7 +182,7 @@ var _ = Describe("Pod Metrics", func() {
p.Status.Phase = corev1.PodPending

fakeClock.Step(1 * time.Hour)
cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
_, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
Expand Down Expand Up @@ -271,7 +271,7 @@ var _ = Describe("Pod Metrics", func() {
})
Expect(found).To(BeTrue())

cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p))

_, found = FindMetricWithLabelValues("karpenter_pods_scheduling_undecided_time_seconds", map[string]string{
Expand Down Expand Up @@ -318,7 +318,7 @@ var _ = Describe("Pod Metrics", func() {
p.Status.Phase = corev1.PodPending
ExpectApplied(ctx, env.Client, p)

cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p))

_, found := FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{
Expand Down
43 changes: 41 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/log"

"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False
// on the NodePool if the nodeClaim fails to launch/register
func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if ok && len(nodePoolName) != 0 {
nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return err
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node")
} else {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message)
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}
48 changes: 48 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package lifecycle_test
import (
"time"

"github.com/awslabs/operatorpkg/status"

operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() {
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
if isManagedNodeClaim {
ExpectNotFound(ctx, env.Client, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: "RegistrationFailed",
Message: "Failed to register node",
})
} else {
ExpectExists(ctx, env.Client, nodeClaim)
}
Expand Down Expand Up @@ -141,6 +150,45 @@ var _ = Describe("Liveness", func() {
// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason,
Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message,
})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)

// NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -82,9 +84,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (
metrics.NodesCreatedTotal.Inc(map[string]string{
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
})
if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True
// on the NodePool if the nodeClaim that registered is owned by a NodePool
func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if ok && len(nodePoolName) != 0 {
nodePool := &v1.NodePool{}
if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return err
}
storedNodePool := nodePool.DeepCopy()
if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}

func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error {
stored := node.DeepCopy()
controllerutil.AddFinalizer(node, v1.TerminationFinalizer)
Expand Down
35 changes: 35 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package lifecycle_test

import (
"time"

"github.com/awslabs/operatorpkg/status"
operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -54,6 +58,7 @@ var _ = Describe("Registration", func() {
})
}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -66,6 +71,10 @@ var _ = Describe("Registration", func() {
if isManagedNodeClaim {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
} else {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsUnknown()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(""))
Expand Down Expand Up @@ -368,4 +377,30 @@ var _ = Describe("Registration", func() {
node = ExpectExists(ctx, env.Client, node)
Expect(node.Spec.Taints).To(HaveLen(0))
})
It("should add NodeRegistrationHealthy=true on the nodePool if registration succeeds and if it was previously false", func() {
nodeClaimOpts := []v1.NodeClaim{{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
}}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy")
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}})
ExpectApplied(ctx, env.Client, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
})
})
9 changes: 2 additions & 7 deletions pkg/controllers/nodepool/readiness/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package readiness
import (
"context"

"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -55,11 +53,8 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
ctx = injection.WithControllerName(ctx, "nodepool.readiness")
stored := nodePool.DeepCopy()

nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool {
return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind()
})
if !ok {
// Ignore NodePools which aren't using a supported NodeClass.
nodeClass := nodepoolutils.GetNodeClassStatusObject(nodePool, c.cloudProvider)
if nodeClass == nil {
return reconcile.Result{}, nil
}

Expand Down
Loading

0 comments on commit 810ad69

Please sign in to comment.