Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add NodeRegistrationHealthy status condition to nodepool #1969

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
// the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
// +optional
NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewControllers(
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolregistrationhealth.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
})
c.recordPodSchedulingUndecidedMetric(pod)
// Get the time for when we Karpenter first thought the pod was schedulable. This should be zero if we didn't simulate for this pod.
schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})
schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just create a separate function to access the PodSchedulingNodeRegistrationHealthySuccessTime or something like this -- I think a boolean here is a bit hard to reason about

c.recordPodStartupMetric(pod, schedulableTime)
c.recordPodBoundMetric(pod, schedulableTime)
// Requeue every 30s for pods that are stuck without a state change
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/metrics/pod/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var _ = Describe("Pod Metrics", func() {
p.Status.Phase = corev1.PodPending

fakeClock.Step(1 * time.Hour)
cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)

// PodScheduled condition does not exist, emit pods_unbound_time_seconds metric
ExpectApplied(ctx, env.Client, p)
Expand Down Expand Up @@ -182,7 +182,7 @@ var _ = Describe("Pod Metrics", func() {
p.Status.Phase = corev1.PodPending

fakeClock.Step(1 * time.Hour)
cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
_, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
Expand Down Expand Up @@ -271,7 +271,7 @@ var _ = Describe("Pod Metrics", func() {
})
Expect(found).To(BeTrue())

cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p))

_, found = FindMetricWithLabelValues("karpenter_pods_scheduling_undecided_time_seconds", map[string]string{
Expand Down Expand Up @@ -318,7 +318,7 @@ var _ = Describe("Pod Metrics", func() {
p.Status.Phase = corev1.PodPending
ExpectApplied(ctx, env.Client, p)

cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p)
cluster.MarkPodSchedulingDecisions(ctx, map[*corev1.Pod]error{}, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p))

_, found := FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{
Expand Down
43 changes: 41 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/log"

"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False
// on the NodePool if the nodeClaim fails to launch/register
func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if ok && len(nodePoolName) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Just check nodePoolName != "" -- you don't even have to check ok, since if the label doesn't exist then it will just return an empty string -- and "" is an invalid name anyways

nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you properly handle the NodePool NotFound error?

return err
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node")
} else {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message)
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}
48 changes: 48 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package lifecycle_test
import (
"time"

"github.com/awslabs/operatorpkg/status"

operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() {
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate this: Do we have a test that we succeed when the NodeClaim has no owning NodePool?

if isManagedNodeClaim {
ExpectNotFound(ctx, env.Client, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: "RegistrationFailed",
Message: "Failed to register node",
})
} else {
ExpectExists(ctx, env.Client, nodeClaim)
}
Expand Down Expand Up @@ -141,6 +150,45 @@ var _ = Describe("Liveness", func() {
// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason,
Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message,
})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)

// NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -82,9 +84,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (
metrics.NodesCreatedTotal.Inc(map[string]string{
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
})
if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True
// on the NodePool if the nodeClaim that registered is owned by a NodePool
func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName, ok := nodeClaim.Labels[v1.NodePoolLabelKey]
if ok && len(nodePoolName) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as in the liveness controller -- I would just check if the value is not equal to ""

nodePool := &v1.NodePool{}
if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle NotFound error here too

return err
}
storedNodePool := nodePool.DeepCopy()
if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}

func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error {
stored := node.DeepCopy()
controllerutil.AddFinalizer(node, v1.TerminationFinalizer)
Expand Down
35 changes: 35 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package lifecycle_test

import (
"time"

"github.com/awslabs/operatorpkg/status"
operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate this: Do we have a test that we succeed when the NodeClaim has no owning NodePool?

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -54,6 +58,7 @@ var _ = Describe("Registration", func() {
})
}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -66,6 +71,10 @@ var _ = Describe("Registration", func() {
if isManagedNodeClaim {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
} else {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsUnknown()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(""))
Expand Down Expand Up @@ -368,4 +377,30 @@ var _ = Describe("Registration", func() {
node = ExpectExists(ctx, env.Client, node)
Expect(node.Spec.Taints).To(HaveLen(0))
})
It("should add NodeRegistrationHealthy=true on the nodePool if registration succeeds and if it was previously false", func() {
nodeClaimOpts := []v1.NodeClaim{{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
}}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy")
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}})
ExpectApplied(ctx, env.Client, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
})
})
9 changes: 2 additions & 7 deletions pkg/controllers/nodepool/readiness/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package readiness
import (
"context"

"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -55,11 +53,8 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
ctx = injection.WithControllerName(ctx, "nodepool.readiness")
stored := nodePool.DeepCopy()

nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool {
return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind()
})
if !ok {
// Ignore NodePools which aren't using a supported NodeClass.
nodeClass := nodepoolutils.GetNodeClassStatusObject(nodePool, c.cloudProvider)
if nodeClass == nil {
return reconcile.Result{}, nil
}

Expand Down
Loading
Loading