Skip to content

Commit

Permalink
[webhook][workload] Validate reclaimable pods
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed May 11, 2023
1 parent 12e2c2e commit de1ca9e
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
65 changes: 65 additions & 0 deletions apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package webhooks

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
Expand Down Expand Up @@ -121,6 +122,7 @@ func ValidateWorkload(obj *kueue.Workload) field.ErrorList {
}

allErrs = append(allErrs, metav1validation.ValidateConditions(obj.Status.Conditions, statusPath.Child("conditions"))...)
allErrs = append(allErrs, validateReclaimablePods(obj, statusPath.Child("reclaimablePods"))...)

return allErrs
}
Expand Down Expand Up @@ -179,13 +181,40 @@ func validateAdmission(obj *kueue.Workload, path *field.Path) field.ErrorList {
return allErrs
}

func validateReclaimablePods(obj *kueue.Workload, basePath *field.Path) field.ErrorList {
if len(obj.Status.ReclaimablePods) == 0 {
return nil
}
knowPodSets := make(map[string]*kueue.PodSet, len(obj.Spec.PodSets))
knowPodSetNames := make([]string, len(obj.Spec.PodSets))
for i := range obj.Spec.PodSets {
name := obj.Spec.PodSets[i].Name
knowPodSets[name] = &obj.Spec.PodSets[i]
knowPodSetNames = append(knowPodSetNames, name)
}

var ret field.ErrorList
for i := range obj.Status.ReclaimablePods {
rps := &obj.Status.ReclaimablePods[i]
ps, found := knowPodSets[rps.Name]
rpsPath := basePath.Key(rps.Name)
if !found {
ret = append(ret, field.NotSupported(rpsPath.Child("name"), rps.Name, knowPodSetNames))
} else if rps.Count > ps.Count {
ret = append(ret, field.Invalid(rpsPath.Child("count"), rps.Count, fmt.Sprintf("should be less or equal to %d", ps.Count)))
}
}
return ret
}

func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
var allErrs field.ErrorList
specPath := field.NewPath("spec")
allErrs = append(allErrs, ValidateWorkload(newObj)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
if workload.IsAdmitted(newObj) && workload.IsAdmitted(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...)
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
}
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)

Expand All @@ -200,3 +229,39 @@ func validateAdmissionUpdate(new, old *kueue.Admission, path *field.Path) field.
}
return apivalidation.ValidateImmutableField(new, old, path)
}

// validateReclaimablePodsUpdate validates that the reclaimable counts do not decrease, this should be checked
// while the workload is admitted.
func validateReclaimablePodsUpdate(newObj, oldObj *kueue.Workload, basePath *field.Path) field.ErrorList {
if workload.ReclaimablePodsAreEqual(newObj.Status.ReclaimablePods, oldObj.Status.ReclaimablePods) {
return nil
}

if len(oldObj.Status.ReclaimablePods) == 0 {
return nil
}

knowPodSets := make(map[string]*kueue.ReclaimablePod, len(oldObj.Status.ReclaimablePods))
for i := range oldObj.Status.ReclaimablePods {
name := oldObj.Status.ReclaimablePods[i].Name
knowPodSets[name] = &oldObj.Status.ReclaimablePods[i]
}

var ret field.ErrorList
newNames := sets.New[string]()
for i := range newObj.Status.ReclaimablePods {
newCount := &newObj.Status.ReclaimablePods[i]
newNames.Insert(newCount.Name)
oldCount, found := knowPodSets[newCount.Name]
if found && newCount.Count < oldCount.Count {
ret = append(ret, field.Invalid(basePath.Key(newCount.Name).Child("count"), newCount.Count, fmt.Sprintf("cannot be less then %d", oldCount.Count)))
}
}

for name := range knowPodSets {
if !newNames.Has(name) {
ret = append(ret, field.Required(basePath.Key(name), "cannot be removed"))
}
}
return ret
}
83 changes: 83 additions & 0 deletions apis/kueue/webhooks/workload_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,21 @@ func TestValidateWorkload(t *testing.T) {
field.Invalid(firstPodSetSpecPath.Child("containers").Index(0).Child("resources", "requests").Key(string(corev1.ResourcePods)), nil, ""),
},
},
"invalid reclaimablePods": {
workload: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 4},
kueue.ReclaimablePod{Name: "ps2", Count: 1},
).
Obj(),
wantErr: field.ErrorList{
field.Invalid(statusPath.Child("reclaimablePods").Key("ps1").Child("count"), nil, ""),
field.NotSupported(statusPath.Child("reclaimablePods").Key("ps2").Child("name"), nil, nil),
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -315,6 +330,74 @@ func TestValidateWorkloadUpdate(t *testing.T) {
field.Invalid(field.NewPath("status", "admission"), nil, ""),
},
},

"reclaimable pod count can change up": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 1},
).
Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 2},
kueue.ReclaimablePod{Name: "ps2", Count: 1},
).
Obj(),
wantErr: nil,
},
"reclaimable pod count cannot change down": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 2},
kueue.ReclaimablePod{Name: "ps2", Count: 1},
).
Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 1},
).
Obj(),
wantErr: field.ErrorList{
field.Invalid(field.NewPath("status", "reclaimablePods").Key("ps1").Child("count"), nil, ""),
field.Required(field.NewPath("status", "reclaimablePods").Key("ps2"), ""),
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down

0 comments on commit de1ca9e

Please sign in to comment.