Skip to content

Commit

Permalink
Upgrader admitet logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mcariatm committed Mar 21, 2023
1 parent 6839478 commit b880723
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 42 deletions.
50 changes: 9 additions & 41 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,7 @@ const (
// statuses for logging purposes
pending = "pending"
admitted = "admitted"
// The cancellingAdmission status means that Admission=nil, but the workload
// still has the Admitted=True condition.
//
// This is a transient state as the workload controller is about the set the
// Admitted condition to False, transitioning the workload into the pending
// status. Only once the workload reaches the pending status is requeued
// so that scheduler can re-admit it.
//
// Cancellation of admission can be triggered in the following scenarios:
// - exceeding the timeout to reach the PodsReady=True condition when the waitForPodsReady configuration is enabled
// - workload preemption
// - clearing of the Admission field manually via API
cancellingAdmission = "cancellingAdmission"
finished = "finished"
finished = "finished"
)

var (
Expand Down Expand Up @@ -134,9 +121,11 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

status := workloadStatus(&wl)
switch status {
case pending:
if !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
return r.reconcileNotReadyTimeout(ctx, req, &wl)
}

if !r.queues.QueueForWorkloadExists(&wl) {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName), constants.AdmissionName)
Expand All @@ -155,18 +144,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
"Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case cancellingAdmission:
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"AdmissionCancelled", "Admission cancelled", constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
case admitted:
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
return r.reconcileNotReadyTimeout(ctx, req, &wl)
} else {
msg := fmt.Sprintf("Admitted by ClusterQueue %s", wl.Status.Admission.ClusterQueue)
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionTrue, "AdmissionByKueue", msg, constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -305,14 +282,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
if !r.cache.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("ClusterQueue for workload didn't exist; ignored for now")
}
case prevStatus == admitted && status == cancellingAdmission:
// The workload will be requeued when handling transitioning
// from cancellingAdmission to pending.
// When the workload is in the cancellingAdmission status, the only
// transition possible, triggered by the workload controller, is to the
// pending status. Scheduler is only able to re-admit the workload once
// requeued after reaching the pending status.
case prevStatus == cancellingAdmission && status == pending:
case status == pending:
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
Expand Down Expand Up @@ -395,12 +365,10 @@ func workloadStatus(w *kueue.Workload) string {
if apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished) {
return finished
}
if w.Status.Admission != nil {
return admitted
}
if apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted) {
return cancellingAdmission
return admitted
}

return pending
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package preemption

import (
"context"
"fmt"
"sort"
"sync/atomic"
"time"
Expand Down Expand Up @@ -128,7 +129,17 @@ func (p *Preemptor) issuePreemptions(ctx context.Context, targets []*workload.In
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
err := p.applyPreemption(ctx, workload.BaseSSAWorkload(target.Obj))
s := workload.FindConditionIndex(&target.Obj.Status, kueue.WorkloadAdmitted)
if s != -1 {
target.Obj.Status.Conditions[s] = metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: "Preemption",
Message: fmt.Sprintf("Preempted by ClusterQueue %s", target.Obj.Status.Admission.ClusterQueue),
}
}
err := p.applyPreemption(ctx, workload.AdmissionPatch(target.Obj))
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ func (s *Scheduler) admit(ctx context.Context, e *entry) error {
if err := s.cache.AssumeWorkload(newWorkload); err != nil {
return err
}
newWorkload.Status.Conditions = append(newWorkload.Status.Conditions, metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Admission Set",
Message: fmt.Sprintf("Admitted by ClusterQueue %s", newWorkload.Status.Admission.ClusterQueue),
})
e.status = assumed
log.V(2).Info("Workload assumed in the cache")

Expand Down
8 changes: 8 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,5 +259,13 @@ func BaseSSAWorkload(w *kueue.Workload) *kueue.Workload {
func AdmissionPatch(w *kueue.Workload) *kueue.Workload {
wlCopy := BaseSSAWorkload(w)
wlCopy.Status.Admission = w.Status.Admission.DeepCopy()

i := FindConditionIndex(&w.Status, kueue.WorkloadAdmitted)
if i != -1 {
wlCopy.Status.Conditions = []metav1.Condition{
w.Status.Conditions[i],
}
}

return wlCopy
}

0 comments on commit b880723

Please sign in to comment.