Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swap admitted condition in the same API call as admission or eviction #654

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,22 @@ func (c *Cache) DeleteWorkload(w *kueue.Workload) error {
return nil
}

func (c *Cache) IsAssumedOrAdmittedWorkload(w workload.Info) bool {
mcariatm marked this conversation as resolved.
Show resolved Hide resolved
c.Lock()
defer c.Unlock()

k := workload.Key(w.Obj)
if _, assumed := c.assumedWorkloads[k]; assumed {
return true
}
if cq, exists := c.clusterQueues[w.ClusterQueue]; exists {
if _, admitted := cq.Workloads[k]; admitted {
return true
}
}
return false
}

func (c *Cache) AssumeWorkload(w *kueue.Workload) error {
c.Lock()
defer c.Unlock()
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,17 @@ func (r *ClusterQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

func (r *ClusterQueueReconciler) NotifyWorkloadUpdate(w *kueue.Workload) {
r.wlUpdateCh <- event.GenericEvent{Object: w}
func (r *ClusterQueueReconciler) NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload) {
if oldWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: oldWl}
if newWl != nil && oldWl.Spec.QueueName != newWl.Spec.QueueName {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
return
}
if newWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
}

func (r *ClusterQueueReconciler) notifyWatchers(oldCQ, newCQ *kueue.ClusterQueue) {
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache
}
}

func (r *LocalQueueReconciler) NotifyWorkloadUpdate(w *kueue.Workload) {
r.wlUpdateCh <- event.GenericEvent{Object: w}
func (r *LocalQueueReconciler) NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload) {
if oldWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: oldWl}
if newWl != nil && oldWl.Spec.QueueName != newWl.Spec.QueueName {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
return
}
if newWl != nil {
mcariatm marked this conversation as resolved.
Show resolved Hide resolved
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
}

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
Expand Down
96 changes: 31 additions & 65 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/limitrange"
Expand All @@ -49,20 +48,7 @@ const (
// statuses for logging purposes
pending = "pending"
admitted = "admitted"
// The cancellingAdmission status means that Admission=nil, but the workload
// still has the Admitted=True condition.
//
// This is a transient state as the workload controller is about the set the
// Admitted condition to False, transitioning the workload into the pending
// status. Only once the workload reaches the pending status is requeued
// so that scheduler can re-admit it.
//
// Cancellation of admission can be triggered in the following scenarios:
// - exceeding the timeout to reach the PodsReady=True condition when the waitForPodsReady configuration is enabled
// - workload preemption
// - clearing of the Admission field manually via API
cancellingAdmission = "cancellingAdmission"
finished = "finished"
finished = "finished"
)

var (
Expand Down Expand Up @@ -95,7 +81,7 @@ func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option {
var defaultOptions = options{}

type WorkloadUpdateWatcher interface {
NotifyWorkloadUpdate(*kueue.Workload)
NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload)
}

// WorkloadReconciler reconciles a Workload object
Expand Down Expand Up @@ -141,41 +127,31 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

status := workloadStatus(&wl)
switch status {
case pending:
if !r.queues.QueueForWorkloadExists(&wl) {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
return r.reconcileNotReadyTimeout(ctx, req, &wl)
}

cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
if !cqOk {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !r.queues.QueueForWorkloadExists(&wl) {
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName))
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !r.cache.ClusterQueueActive(cqName) {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case cancellingAdmission:
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"AdmissionCancelled", "Admission cancelled", constants.AdmissionName)
cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
if !cqOk {
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName))
return ctrl.Result{}, client.IgnoreNotFound(err)
case admitted:
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
return r.reconcileNotReadyTimeout(ctx, req, &wl)
} else {
msg := fmt.Sprintf("Admitted by ClusterQueue %s", wl.Status.Admission.ClusterQueue)
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionTrue, "AdmissionByKueue", msg, constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

if !r.cache.ClusterQueueActive(cqName) {
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName))
return ctrl.Result{}, client.IgnoreNotFound(err)
}
return ctrl.Result{}, nil
}

Expand All @@ -189,7 +165,8 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
return ctrl.Result{RequeueAfter: recheckAfter}, nil
} else {
klog.V(2).InfoS("Cancelling admission of the workload due to exceeding the PodsReady timeout", "workload", req.NamespacedName.String())
err := r.client.Status().Patch(ctx, workload.BaseSSAWorkload(wl), client.Apply, client.FieldOwner(constants.AdmissionName))
err := workload.UnsetAdmissionWithCondition(ctx, r.client, wl,
"Evicted", fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
Expand All @@ -200,7 +177,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
}
defer r.notifyWatchers(wl)
defer r.notifyWatchers(nil, wl)
status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
log.V(2).Info("Workload create event")
Expand Down Expand Up @@ -231,7 +208,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
}
defer r.notifyWatchers(wl)
defer r.notifyWatchers(wl, nil)
status := "unknown"
if !e.DeleteStateUnknown {
status = workloadStatus(wl)
Expand Down Expand Up @@ -272,8 +249,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
return true
}
wl := e.ObjectNew.(*kueue.Workload)
defer r.notifyWatchers(oldWl)
defer r.notifyWatchers(wl)
defer r.notifyWatchers(oldWl, wl)

status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
Expand Down Expand Up @@ -324,14 +300,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
if !r.cache.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("ClusterQueue for workload didn't exist; ignored for now")
}
case prevStatus == admitted && status == cancellingAdmission:
// The workload will be requeued when handling transitioning
// from cancellingAdmission to pending.
// When the workload is in the cancellingAdmission status, the only
// transition possible, triggered by the workload controller, is to the
// pending status. Scheduler is only able to re-admit the workload once
// requeued after reaching the pending status.
case prevStatus == cancellingAdmission && status == pending:
case prevStatus == admitted && status == pending:
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
Expand Down Expand Up @@ -361,9 +330,9 @@ func (r *WorkloadReconciler) Generic(e event.GenericEvent) bool {
return false
}

func (r *WorkloadReconciler) notifyWatchers(wl *kueue.Workload) {
func (r *WorkloadReconciler) notifyWatchers(oldWl, newWl *kueue.Workload) {
for _, w := range r.watchers {
w.NotifyWorkloadUpdate(wl)
w.NotifyWorkloadUpdate(oldWl, newWl)
}
}

Expand Down Expand Up @@ -419,11 +388,8 @@ func workloadStatus(w *kueue.Workload) string {
if apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished) {
return finished
}
if w.Status.Admission != nil {
return admitted
}
if apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted) {
return cancellingAdmission
return admitted
}
return pending
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ func (p *Preemptor) issuePreemptions(ctx context.Context, targets []*workload.In
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
err := p.applyPreemption(ctx, workload.BaseSSAWorkload(target.Obj))
patch := workload.BaseSSAWorkload(target.Obj)
patch.Status.Conditions = []metav1.Condition{{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: "Preempted",
Message: "Preempted to accommodate a higher priority Workload",
}}
err := p.applyPreemption(ctx, patch)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
Expand Down
19 changes: 15 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *Scheduler) schedule(ctx context.Context) {
log.V(5).Info("Waiting for all admitted workloads to be in the PodsReady condition")
// Block admission until all currently admitted workloads are in
// PodsReady condition if the waitForPodsReady is enabled
if err := workload.UpdateStatus(ctx, s.client, e.Obj, kueue.WorkloadAdmitted, metav1.ConditionFalse, "Waiting", "waiting for all admitted workloads to be in PodsReady condition", constants.AdmissionName); err != nil {
if err := workload.UnsetAdmissionWithCondition(ctx, s.client, e.Obj, "Waiting", "waiting for all admitted workloads to be in PodsReady condition"); err != nil {
log.Error(err, "Could not update Workload status")
}
s.cache.WaitForPodsReady(ctx)
Expand Down Expand Up @@ -240,7 +240,10 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
cq := snap.ClusterQueues[w.ClusterQueue]
ns := corev1.Namespace{}
e := entry{Info: w}
if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) {
if s.cache.IsAssumedOrAdmittedWorkload(w) {
log.Error(fmt.Errorf("workload is assumed"), "nominate", "workloadInfo", w)
mcariatm marked this conversation as resolved.
Show resolved Hide resolved
continue
} else if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) {
e.inadmissibleMsg = fmt.Sprintf("ClusterQueue %s is inactive", w.ClusterQueue)
} else if cq == nil {
e.inadmissibleMsg = fmt.Sprintf("ClusterQueue %s not found", w.ClusterQueue)
Expand Down Expand Up @@ -276,7 +279,15 @@ func (s *Scheduler) admit(ctx context.Context, e *entry) error {
log.V(2).Info("Workload assumed in the cache")

s.admissionRoutineWrapper.Run(func() {
err := s.applyAdmission(ctx, workload.AdmissionPatch(newWorkload))
patch := workload.AdmissionPatch(newWorkload)
patch.Status.Conditions = []metav1.Condition{{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Admitted",
Message: fmt.Sprintf("Admitted by ClusterQueue %s", newWorkload.Status.Admission.ClusterQueue),
}}
err := s.applyAdmission(ctx, patch)
if err == nil {
waitTime := time.Since(e.Obj.CreationTimestamp.Time)
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time was %.3fs", admission.ClusterQueue, waitTime.Seconds())
Expand Down Expand Up @@ -338,7 +349,7 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added)

if e.status == notNominated {
err := workload.UpdateStatus(ctx, s.client, e.Obj, kueue.WorkloadAdmitted, metav1.ConditionFalse, "Pending", e.inadmissibleMsg, constants.AdmissionName)
err := workload.UnsetAdmissionWithCondition(ctx, s.client, e.Obj, "Pending", e.inadmissibleMsg)
if err != nil {
log.Error(err, "Could not update Workload status")
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/util/api"
)

Expand Down Expand Up @@ -244,6 +245,23 @@ func UpdateStatusIfChanged(ctx context.Context,
return UpdateStatus(ctx, c, wl, conditionType, conditionStatus, reason, message, managerPrefix)
}

func UnsetAdmissionWithCondition(
ctx context.Context,
c client.Client,
wl *kueue.Workload,
reason, message string) error {
condition := metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: api.TruncateConditionMessage(message),
}
newWl := BaseSSAWorkload(wl)
newWl.Status.Conditions = []metav1.Condition{condition}
return c.Status().Patch(ctx, newWl, client.Apply, client.FieldOwner(constants.AdmissionName))
}

// BaseSSAWorkload creates a new object based on the input workload that
// only contains the fields necessary to identify the original object.
// The object can be used in as a base for Server-Side-Apply.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
gomega.Eventually(func() error {
var newWL kueue.Workload
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(w), &newWL)).To(gomega.Succeed())
newWL.Status.Admission = admissions[i]
if admissions[i] != nil {
return util.AdmitWorkload(ctx, k8sClient, &newWL, admissions[i])
}
return k8sClient.Status().Update(ctx, &newWL)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
Expand Down Expand Up @@ -376,8 +378,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
ginkgo.By("Admit workload")
wl := testing.MakeWorkload("workload", ns.Name).Queue(lq.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())
wl.Status.Admission = testing.MakeAdmission(cq.Name).Obj()
gomega.Expect(k8sClient.Status().Update(ctx, wl)).To(gomega.Succeed())
gomega.Expect(util.AdmitWorkload(ctx, k8sClient, wl, testing.MakeAdmission(cq.Name).Obj())).To(gomega.Succeed())

ginkgo.By("Delete clusterQueue")
gomega.Expect(util.DeleteClusterQueue(ctx, k8sClient, cq)).To(gomega.Succeed())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ var _ = ginkgo.Describe("Queue controller", func() {
gomega.Eventually(func() error {
var newWL kueue.Workload
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(w), &newWL)).To(gomega.Succeed())
newWL.Status.Admission = testing.MakeAdmission(clusterQueue.Name).
Assignment(corev1.ResourceCPU, flavorOnDemand, "1").Obj()
return k8sClient.Status().Update(ctx, &newWL)
return util.AdmitWorkload(ctx, k8sClient, &newWL, testing.MakeAdmission(clusterQueue.Name).
Assignment(corev1.ResourceCPU, flavorOnDemand, "1").Obj())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
gomega.Eventually(func() kueue.LocalQueueStatus {
Expand Down
Loading