Skip to content

Commit

Permalink
chore: add NodeRegistrationHealthy status condition to nodepool
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Feb 8, 2025
1 parent 8e8b99d commit 03a7d60
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 13 deletions.
6 changes: 6 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// NodeClassObservedGeneration represents the nodeClass generation for referenced nodeClass. If this does not match
// the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
// +optional
NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewControllers(
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolregistrationhealth.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
Expand Down
62 changes: 50 additions & 12 deletions pkg/controllers/nodeclaim/lifecycle/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -54,38 +57,73 @@ func (i *Initialization) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim)
return reconcile.Result{}, nil
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("provider-id", nodeClaim.Status.ProviderID))
requeue, err := i.updateNodePoolRegistrationHealth(ctx, nodeClaim)
if requeue {
return reconcile.Result{Requeue: true}, nil
}
if err != nil {
return reconcile.Result{}, err
}
node, err := nodeclaimutils.NodeForNodeClaim(ctx, i.kubeClient, nodeClaim)
if err != nil {
nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeInitialized, "NodeNotFound", "Node not registered with cluster")
return reconcile.Result{}, nil //nolint:nilerr
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef("", node.Name)))
if updateNodeClaimInitializedUnknown(node, nodeClaim) {
return reconcile.Result{}, nil
}
stored := node.DeepCopy()
node.Labels = lo.Assign(node.Labels, map[string]string{v1.NodeInitializedLabelKey: "true"})
if !equality.Semantic.DeepEqual(stored, node) {
if err = i.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
log.FromContext(ctx).WithValues("allocatable", node.Status.Allocatable).Info("initialized nodeclaim")
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeInitialized)
return reconcile.Result{}, nil
}

func updateNodeClaimInitializedUnknown(node *corev1.Node, nodeClaim *v1.NodeClaim) bool {
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeInitialized, "NodeNotReady", "Node status is NotReady")
return reconcile.Result{}, nil
return true
}
if taint, ok := StartupTaintsRemoved(node, nodeClaim); !ok {
nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeInitialized, "StartupTaintsExist", fmt.Sprintf("StartupTaint %q still exists", formatTaint(taint)))
return reconcile.Result{}, nil
return true
}
if taint, ok := KnownEphemeralTaintsRemoved(node); !ok {
nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeInitialized, "KnownEphemeralTaintsExist", fmt.Sprintf("KnownEphemeralTaint %q still exists", formatTaint(taint)))
return reconcile.Result{}, nil
return true
}
if name, ok := RequestedResourcesRegistered(node, nodeClaim); !ok {
nodeClaim.StatusConditions().SetUnknownWithReason(v1.ConditionTypeInitialized, "ResourceNotRegistered", fmt.Sprintf("Resource %q was requested but not registered", name))
return reconcile.Result{}, nil
return true
}
stored := node.DeepCopy()
node.Labels = lo.Assign(node.Labels, map[string]string{v1.NodeInitializedLabelKey: "true"})
if !equality.Semantic.DeepEqual(stored, node) {
if err = i.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
return false
}

func (i *Initialization) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) (bool, error) {
nodePool := &v1.NodePool{}
if err := i.kubeClient.Get(ctx, types.NamespacedName{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}, nodePool); err != nil {
return false, err
}
storedNodePool := nodePool.DeepCopy()
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
if !equality.Semantic.DeepEqual(storedNodePool, nodePool) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := i.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return true, nil
}
return false, err
}
}
log.FromContext(ctx).WithValues("allocatable", node.Status.Allocatable).Info("initialized nodeclaim")
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeInitialized)
return reconcile.Result{}, nil
return false, nil
}

// KnownEphemeralTaintsRemoved validates whether all the ephemeral taints are removed
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/initialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ var _ = Describe("Initialization", func() {
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectMakeNodesReady(ctx, env.Client, node) // Remove the not-ready taint
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodePool = ExpectExists(ctx, env.Client, nodePool)
Expect(ExpectStatusConditionExists(nodePool, v1.ConditionTypeNodeRegistrationHealthy).Status).To(Equal(metav1.ConditionTrue))

node = ExpectExists(ctx, env.Client, node)
Expect(node.Labels).To(HaveKeyWithValue(v1.NodeInitializedLabelKey, "true"))
Expand Down
30 changes: 29 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"

"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -51,6 +55,30 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}, nodePool); err != nil {
return reconcile.Result{}, err
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to launch/register during the TTL set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if nodeClaim.StatusConditions().IsTrue(v1.ConditionTypeLaunched) {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "Unhealthy", "Failed to register node")
} else {
launchFailure := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched)
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchFailure.Reason, launchFailure.Message)
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand Down
44 changes: 44 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -78,6 +79,11 @@ var _ = Describe("Liveness", func() {
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
if isManagedNodeClaim {
ExpectNotFound(ctx, env.Client, nodeClaim)
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeRegistrationHealthySC := ExpectStatusConditionExists(nodePool, v1.ConditionTypeNodeRegistrationHealthy)
Expect(nodeRegistrationHealthySC.Status).To(Equal(metav1.ConditionFalse))
Expect(nodeRegistrationHealthySC.Reason).To(Equal("Unhealthy"))
Expect(nodeRegistrationHealthySC.Message).To(Equal("Failed to register node"))
} else {
ExpectExists(ctx, env.Client, nodeClaim)
}
Expand Down Expand Up @@ -141,6 +147,44 @@ var _ = Describe("Liveness", func() {
// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeRegistrationHealthySC := ExpectStatusConditionExists(nodePool, v1.ConditionTypeNodeRegistrationHealthy)
Expect(nodeRegistrationHealthySC.Status).To(Equal(metav1.ConditionFalse))
Expect(nodeRegistrationHealthySC.Reason).To(Equal(nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason))
Expect(nodeRegistrationHealthySC.Message).To(Equal(nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message))
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodePool = ExpectExists(ctx, env.Client, nodePool)
// NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True
Expect(ExpectStatusConditionExists(nodePool, v1.ConditionTypeNodeRegistrationHealthy).Status).To(Equal(metav1.ConditionTrue))
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
Expand Down
101 changes: 101 additions & 0 deletions pkg/controllers/nodepool/registrationhealth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package registrationhealth

import (
"context"

"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"

"sigs.k8s.io/karpenter/pkg/operator/injection"

controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
nodepoolutils "sigs.k8s.io/karpenter/pkg/utils/nodepool"
)

// Controller for the resource
type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
}

// NewController is a constructor
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
kubeClient: kubeClient,
cloudProvider: cloudProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodepool.healthyregistration")
stored := nodePool.DeepCopy()

nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool {
return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind()
})
if !ok {
// Ignore NodePools which aren't using a supported NodeClass.
return reconcile.Result{}, nil
}
if err := c.kubeClient.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// If NodeClass/NodePool have been updated then NodeRegistrationHealthy = Unknown
if (nodePool.Status.NodeClassObservedGeneration != nodeClass.GetGeneration()) ||
(nodePool.Generation != nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).ObservedGeneration) {
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy)
nodePool.Status.NodeClassObservedGeneration = nodeClass.GetGeneration()
}

if !equality.Semantic.DeepEqual(stored, nodePool) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
b := controllerruntime.NewControllerManagedBy(m).
Named("nodepool.healthyregistration").
For(&v1.NodePool{}, builder.WithPredicates(nodepoolutils.IsManagedPredicateFuncs(c.cloudProvider))).
WithOptions(controller.Options{MaxConcurrentReconciles: 10})
for _, nodeClass := range c.cloudProvider.GetSupportedNodeClasses() {
b.Watches(nodeClass, nodepoolutils.NodeClassEventHandler(c.kubeClient))
}
return b.Complete(reconcile.AsReconciler(m.GetClient(), c))
}
Loading

0 comments on commit 03a7d60

Please sign in to comment.