Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically reclaiming resources #756

Merged
merged 7 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ type PodSetAssignment struct {
//
// Beside what is provided in podSet's specs, this calculation takes into account
// the LimitRange defaults and RuntimeClass overheads at the moment of admission.
trasc marked this conversation as resolved.
Show resolved Hide resolved
// This field will not change in case of quota reclaim.
ResourceUsage corev1.ResourceList `json:"resourceUsage,omitempty"`

// count is the number of pods taken into account at admission time.
trasc marked this conversation as resolved.
Show resolved Hide resolved
// This field will not change in case of quota reclaim.
// Value could be zero for Workloads created before this field was added.
// +kubebuilder:validation:Minimum=0
Count int32 `json:"count"`
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
}

type PodSet struct {
Expand Down Expand Up @@ -124,6 +131,22 @@ type WorkloadStatus struct {
// +listType=map
// +listMapKey=type
Conditions []metav1.Condition `json:"conditions,omitempty"`

// reclaimablePods keeps track of the number pods within a podset for which
// the resource reservation is no longer needed.
// +optional
// +listType=map
// +listMapKey=name
ReclaimablePods []ReclaimablePod `json:"reclaimablePods,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having second thoughts here.

If we assume that the admission section can be modified after admission (to update Count and usage), then I prefer we have reclaimableCount inside the admission struct.

I think this might be a better idea as we move towards elastic jobs. @tenzen-y ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I prefer we have reclaimableCount inside the admission struct.

Does this mean we have reclaimableCount inside the admission instead of having this ReclaimablePods (removing ReclaimablePods )?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's what I'm thinking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we selected ReclaimablePods instead of reclaimableCount since it could be misleading when the job is suspended.

#742 (comment)

However, as @alculquicondor says, we can ignore that concern if we have reclaimableCount inside the Admission.

I think this might be a better idea as we move towards elastic jobs.

right. The Admission has the reclaimableCount would be more natural for the elastic job.

@trasc Are you concerned about having reclaimableCount inside the Admission?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we assume that the admission section can be modified after admission (to update Count and usage), then I prefer we have reclaimableCount inside the admission struct.

The admission is not changed when reclaimable changes.

@trasc Are you concerned about having reclaimableCount inside the Admission?

SSA conflicts. We should keep them separated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we have elastic jobs. Then the kueue scheduler+preemption could update count.

However, this is generally independent from the concept of reclaiming pods.

Ok, let's keep it like this.

Any reason why not make this map[string]int32?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to look similar to flavors and usage

}

type ReclaimablePod struct {
// name is the PodSet name.
Name string `json:"name"`

// count is the number of pods for which the requested resources are no longer needed.
// +kubebuilder:validation:Minimum=0
Count int32 `json:"count"`
}

const (
Expand Down
20 changes: 20 additions & 0 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 76 additions & 3 deletions apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package webhooks

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
Expand Down Expand Up @@ -121,6 +122,7 @@ func ValidateWorkload(obj *kueue.Workload) field.ErrorList {
}

allErrs = append(allErrs, metav1validation.ValidateConditions(obj.Status.Conditions, statusPath.Child("conditions"))...)
allErrs = append(allErrs, validateReclaimablePods(obj, statusPath.Child("reclaimablePods"))...)

return allErrs
}
Expand Down Expand Up @@ -165,27 +167,62 @@ func validateAdmission(obj *kueue.Workload, path *field.Path) field.ErrorList {
for _, ps := range obj.Spec.PodSets {
names.Insert(ps.Name)
}
psFlavorsPath := path.Child("podSetFlavors")
assigmentsPath := path.Child("podSetAssignments")
if names.Len() != len(admission.PodSetAssignments) {
allErrs = append(allErrs, field.Invalid(psFlavorsPath, field.OmitValueType{}, "must have the same number of podSets as the spec"))
allErrs = append(allErrs, field.Invalid(assigmentsPath, field.OmitValueType{}, "must have the same number of podSets as the spec"))
}

for i, ps := range admission.PodSetAssignments {
psaPath := assigmentsPath.Index(i)
if !names.Has(ps.Name) {
allErrs = append(allErrs, field.NotFound(psFlavorsPath.Index(i).Child("name"), ps.Name))
allErrs = append(allErrs, field.NotFound(psaPath.Child("name"), ps.Name))
}
if ps.Count > 0 {
for k, v := range ps.ResourceUsage {
if (workload.ResourceValue(k, v) % int64(ps.Count)) != 0 {
allErrs = append(allErrs, field.Invalid(psaPath.Child("resourceUsage").Key(string(k)), v, fmt.Sprintf("is not a multiple of %d", ps.Count)))
}
}
}
}

return allErrs
}

func validateReclaimablePods(obj *kueue.Workload, basePath *field.Path) field.ErrorList {
if len(obj.Status.ReclaimablePods) == 0 {
return nil
}
knowPodSets := make(map[string]*kueue.PodSet, len(obj.Spec.PodSets))
knowPodSetNames := make([]string, len(obj.Spec.PodSets))
for i := range obj.Spec.PodSets {
name := obj.Spec.PodSets[i].Name
knowPodSets[name] = &obj.Spec.PodSets[i]
knowPodSetNames = append(knowPodSetNames, name)
}

var ret field.ErrorList
for i := range obj.Status.ReclaimablePods {
rps := &obj.Status.ReclaimablePods[i]
ps, found := knowPodSets[rps.Name]
rpsPath := basePath.Key(rps.Name)
if !found {
ret = append(ret, field.NotSupported(rpsPath.Child("name"), rps.Name, knowPodSetNames))
} else if rps.Count > ps.Count {
ret = append(ret, field.Invalid(rpsPath.Child("count"), rps.Count, fmt.Sprintf("should be less or equal to %d", ps.Count)))
}
}
return ret
}

func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
var allErrs field.ErrorList
specPath := field.NewPath("spec")
allErrs = append(allErrs, ValidateWorkload(newObj)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
if workload.IsAdmitted(newObj) && workload.IsAdmitted(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...)
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
}
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)

Expand All @@ -200,3 +237,39 @@ func validateAdmissionUpdate(new, old *kueue.Admission, path *field.Path) field.
}
return apivalidation.ValidateImmutableField(new, old, path)
}

// validateReclaimablePodsUpdate validates that the reclaimable counts do not decrease, this should be checked
// while the workload is admitted.
func validateReclaimablePodsUpdate(newObj, oldObj *kueue.Workload, basePath *field.Path) field.ErrorList {
if workload.ReclaimablePodsAreEqual(newObj.Status.ReclaimablePods, oldObj.Status.ReclaimablePods) {
return nil
}

if len(oldObj.Status.ReclaimablePods) == 0 {
return nil
}

knowPodSets := make(map[string]*kueue.ReclaimablePod, len(oldObj.Status.ReclaimablePods))
for i := range oldObj.Status.ReclaimablePods {
name := oldObj.Status.ReclaimablePods[i].Name
knowPodSets[name] = &oldObj.Status.ReclaimablePods[i]
}

var ret field.ErrorList
newNames := sets.New[string]()
for i := range newObj.Status.ReclaimablePods {
newCount := &newObj.Status.ReclaimablePods[i]
newNames.Insert(newCount.Name)
oldCount, found := knowPodSets[newCount.Name]
if found && newCount.Count < oldCount.Count {
ret = append(ret, field.Invalid(basePath.Key(newCount.Name).Child("count"), newCount.Count, fmt.Sprintf("cannot be less then %d", oldCount.Count)))
}
trasc marked this conversation as resolved.
Show resolved Hide resolved
}

for name := range knowPodSets {
if !newNames.Has(name) {
ret = append(ret, field.Required(basePath.Key(name), "cannot be removed"))
}
}
return ret
}
105 changes: 101 additions & 4 deletions apis/kueue/webhooks/workload_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func TestValidateWorkload(t *testing.T) {
field.Invalid(statusPath.Child("admission", "clusterQueue"), nil, ""),
},
},
"should have a valid podSet name in status": {
"should have a valid podSet name in status assigment": {
workload: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
Admit(testingutil.MakeAdmission("cluster-queue", "@invalid").Obj()).
Obj(),
wantErr: field.ErrorList{
field.NotFound(statusPath.Child("admission", "podSetFlavors").Index(0).Child("name"), nil),
field.NotFound(statusPath.Child("admission", "podSetAssignments").Index(0).Child("name"), nil),
},
},
"should have same podSets in admission": {
Expand All @@ -179,8 +179,22 @@ func TestValidateWorkload(t *testing.T) {
Admit(testingutil.MakeAdmission("cluster-queue", "main1", "main2", "main3").Obj()).
Obj(),
wantErr: field.ErrorList{
field.Invalid(statusPath.Child("admission", "podSetFlavors"), nil, ""),
field.NotFound(statusPath.Child("admission", "podSetFlavors").Index(2).Child("name"), nil),
field.Invalid(statusPath.Child("admission", "podSetAssignments"), nil, ""),
field.NotFound(statusPath.Child("admission", "podSetAssignments").Index(2).Child("name"), nil),
},
},
"assignment usage should be divisible by count": {
workload: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(*testingutil.MakePodSet("main", 3).
Request(corev1.ResourceCPU, "1").
Obj()).
Admit(testingutil.MakeAdmission("cluster-queue").
Assignment(corev1.ResourceCPU, "flv", "1").
AssignmentPodCount(3).
Obj()).
Obj(),
wantErr: field.ErrorList{
field.Invalid(statusPath.Child("admission", "podSetAssignments").Index(0).Child("resourceUsage").Key(string(corev1.ResourceCPU)), nil, ""),
},
},
"should not request num-pods resource": {
Expand Down Expand Up @@ -217,6 +231,21 @@ func TestValidateWorkload(t *testing.T) {
field.Invalid(firstPodSetSpecPath.Child("containers").Index(0).Child("resources", "requests").Key(string(corev1.ResourcePods)), nil, ""),
},
},
"invalid reclaimablePods": {
workload: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 4},
kueue.ReclaimablePod{Name: "ps2", Count: 1},
).
Obj(),
wantErr: field.ErrorList{
field.Invalid(statusPath.Child("reclaimablePods").Key("ps1").Child("count"), nil, ""),
field.NotSupported(statusPath.Child("reclaimablePods").Key("ps2").Child("name"), nil, nil),
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -315,6 +344,74 @@ func TestValidateWorkloadUpdate(t *testing.T) {
field.Invalid(field.NewPath("status", "admission"), nil, ""),
},
},

"reclaimable pod count can change up": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 1},
).
Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 2},
kueue.ReclaimablePod{Name: "ps2", Count: 1},
).
Obj(),
wantErr: nil,
},
"reclaimable pod count cannot change down": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 2},
kueue.ReclaimablePod{Name: "ps2", Count: 1},
).
Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
PodSets(
*testingutil.MakePodSet("ps1", 3).Obj(),
*testingutil.MakePodSet("ps2", 3).Obj(),
).
Admit(
testingutil.MakeAdmission("cluster-queue").
PodSets(kueue.PodSetAssignment{Name: "ps1"}, kueue.PodSetAssignment{Name: "ps2"}).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{Name: "ps1", Count: 1},
).
Obj(),
wantErr: field.ErrorList{
field.Invalid(field.NewPath("status", "reclaimablePods").Key("ps1").Child("count"), nil, ""),
field.Required(field.NewPath("status", "reclaimablePods").Key("ps2"), ""),
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down
34 changes: 33 additions & 1 deletion charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8080,6 +8080,14 @@ spec:
each of the .spec.podSets entries.
items:
properties:
count:
description: count is the number of pods taken into account
at admission time. This field will not change in case
of quota reclaim. Value could be zero for Workloads created
before this field was added.
format: int32
minimum: 0
type: integer
flavors:
additionalProperties:
description: ResourceFlavorReference is the name of the
Expand All @@ -8104,9 +8112,11 @@ spec:
all the pods in the podset need to run. \n Beside what
is provided in podSet's specs, this calculation takes
into account the LimitRange defaults and RuntimeClass
overheads at the moment of admission."
overheads at the moment of admission. This field will
not change in case of quota reclaim."
type: object
required:
- count
- name
type: object
type: array
Expand Down Expand Up @@ -8194,6 +8204,28 @@ spec:
x-kubernetes-list-map-keys:
- type
x-kubernetes-list-type: map
reclaimablePods:
description: reclaimablePods keeps track of the number pods within
a podset for which the resource reservation is no longer needed.
items:
properties:
count:
description: count is the number of pods for which the requested
resources are no longer needed.
format: int32
minimum: 0
type: integer
name:
description: name is the PodSet name.
type: string
required:
- count
- name
type: object
type: array
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
type: object
type: object
served: true
Expand Down
Loading