Skip to content

Commit

Permalink
603 admitted condition swap complexity (#654)
Browse files Browse the repository at this point in the history
* ssa admit condition

* Linter fix

* Changed admission check

* Fixes, code structure and other

* race conditionn fixes and smal refactorization

* Notify Wl update changes

* Added admission checking on nominate

* added unit test for workload IsAssumedOrAdmitted in cache
  • Loading branch information
mcariatm authored Apr 4, 2023
1 parent 5ae0353 commit 5227f60
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 97 deletions.
16 changes: 16 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,22 @@ func (c *Cache) DeleteWorkload(w *kueue.Workload) error {
return nil
}

func (c *Cache) IsAssumedOrAdmittedWorkload(w workload.Info) bool {
c.Lock()
defer c.Unlock()

k := workload.Key(w.Obj)
if _, assumed := c.assumedWorkloads[k]; assumed {
return true
}
if cq, exists := c.clusterQueues[w.ClusterQueue]; exists {
if _, admitted := cq.Workloads[k]; admitted {
return true
}
}
return false
}

func (c *Cache) AssumeWorkload(w *kueue.Workload) error {
c.Lock()
defer c.Unlock()
Expand Down
114 changes: 114 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2082,6 +2082,120 @@ func TestCachePodsReadyForAllAdmittedWorkloads(t *testing.T) {
}
}

// TestIsAssumedOrAdmittedCheckWorkload verifies if workload is in Assumed map from cache or if it is Admitted in one ClusterQueue
func TestIsAssumedOrAdmittedCheckWorkload(t *testing.T) {
tests := []struct {
name string
cache *Cache
workload workload.Info
expected bool
}{
{
name: "Workload Is Assumed and not Admitted",
cache: &Cache{
assumedWorkloads: map[string]string{"workload_namespace/workload_name": "test", "test2": "test2"},
},
workload: workload.Info{
ClusterQueue: "ClusterQueue1",
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name",
Namespace: "workload_namespace",
},
},
},
expected: true,
}, {
name: "Workload Is not Assumed but is Admitted",
cache: &Cache{
clusterQueues: map[string]*ClusterQueue{
"ClusterQueue1": {
Name: "ClusterQueue1",
Workloads: map[string]*workload.Info{"workload_namespace/workload_name": {
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name",
Namespace: "workload_namespace",
},
},
}},
}},
},

workload: workload.Info{
ClusterQueue: "ClusterQueue1",
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name",
Namespace: "workload_namespace",
},
},
},
expected: true,
}, {
name: "Workload Is Assumed and Admitted",
cache: &Cache{
clusterQueues: map[string]*ClusterQueue{
"ClusterQueue1": {
Name: "ClusterQueue1",
Workloads: map[string]*workload.Info{"workload_namespace/workload_name": {
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name",
Namespace: "workload_namespace",
},
},
}},
}},
assumedWorkloads: map[string]string{"workload_namespace/workload_name": "test", "test2": "test2"},
},
workload: workload.Info{
ClusterQueue: "ClusterQueue1",
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name",
Namespace: "workload_namespace",
},
},
},
expected: true,
}, {
name: "Workload Is not Assumed and is not Admitted",
cache: &Cache{
clusterQueues: map[string]*ClusterQueue{
"ClusterQueue1": {
Name: "ClusterQueue1",
Workloads: map[string]*workload.Info{"workload_namespace2/workload_name2": {
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name2",
Namespace: "workload_namespace2",
},
},
}},
}},
},
workload: workload.Info{
ClusterQueue: "ClusterQueue1",
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "workload_name",
Namespace: "workload_namespace",
},
},
},
expected: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if tc.cache.IsAssumedOrAdmittedWorkload(tc.workload) != tc.expected {
t.Error("Unexpected response")
}
})
}
}

func messageOrEmpty(err error) string {
if err == nil {
return ""
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,17 @@ func (r *ClusterQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

func (r *ClusterQueueReconciler) NotifyWorkloadUpdate(w *kueue.Workload) {
r.wlUpdateCh <- event.GenericEvent{Object: w}
func (r *ClusterQueueReconciler) NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload) {
if oldWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: oldWl}
if newWl != nil && oldWl.Spec.QueueName != newWl.Spec.QueueName {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
return
}
if newWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
}

func (r *ClusterQueueReconciler) notifyWatchers(oldCQ, newCQ *kueue.ClusterQueue) {
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache
}
}

func (r *LocalQueueReconciler) NotifyWorkloadUpdate(w *kueue.Workload) {
r.wlUpdateCh <- event.GenericEvent{Object: w}
func (r *LocalQueueReconciler) NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload) {
if oldWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: oldWl}
if newWl != nil && oldWl.Spec.QueueName != newWl.Spec.QueueName {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
return
}
if newWl != nil {
r.wlUpdateCh <- event.GenericEvent{Object: newWl}
}
}

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
Expand Down
96 changes: 31 additions & 65 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/limitrange"
Expand All @@ -49,20 +48,7 @@ const (
// statuses for logging purposes
pending = "pending"
admitted = "admitted"
// The cancellingAdmission status means that Admission=nil, but the workload
// still has the Admitted=True condition.
//
// This is a transient state as the workload controller is about the set the
// Admitted condition to False, transitioning the workload into the pending
// status. Only once the workload reaches the pending status is requeued
// so that scheduler can re-admit it.
//
// Cancellation of admission can be triggered in the following scenarios:
// - exceeding the timeout to reach the PodsReady=True condition when the waitForPodsReady configuration is enabled
// - workload preemption
// - clearing of the Admission field manually via API
cancellingAdmission = "cancellingAdmission"
finished = "finished"
finished = "finished"
)

var (
Expand Down Expand Up @@ -95,7 +81,7 @@ func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option {
var defaultOptions = options{}

type WorkloadUpdateWatcher interface {
NotifyWorkloadUpdate(*kueue.Workload)
NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload)
}

// WorkloadReconciler reconciles a Workload object
Expand Down Expand Up @@ -141,41 +127,31 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

status := workloadStatus(&wl)
switch status {
case pending:
if !r.queues.QueueForWorkloadExists(&wl) {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
return r.reconcileNotReadyTimeout(ctx, req, &wl)
}

cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
if !cqOk {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !r.queues.QueueForWorkloadExists(&wl) {
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName))
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !r.cache.ClusterQueueActive(cqName) {
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName), constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case cancellingAdmission:
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionFalse,
"AdmissionCancelled", "Admission cancelled", constants.AdmissionName)
cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
if !cqOk {
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName))
return ctrl.Result{}, client.IgnoreNotFound(err)
case admitted:
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
return r.reconcileNotReadyTimeout(ctx, req, &wl)
} else {
msg := fmt.Sprintf("Admitted by ClusterQueue %s", wl.Status.Admission.ClusterQueue)
err := workload.UpdateStatusIfChanged(ctx, r.client, &wl, kueue.WorkloadAdmitted, metav1.ConditionTrue, "AdmissionByKueue", msg, constants.AdmissionName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

if !r.cache.ClusterQueueActive(cqName) {
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName))
return ctrl.Result{}, client.IgnoreNotFound(err)
}
return ctrl.Result{}, nil
}

Expand All @@ -189,7 +165,8 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
return ctrl.Result{RequeueAfter: recheckAfter}, nil
} else {
klog.V(2).InfoS("Cancelling admission of the workload due to exceeding the PodsReady timeout", "workload", req.NamespacedName.String())
err := r.client.Status().Patch(ctx, workload.BaseSSAWorkload(wl), client.Apply, client.FieldOwner(constants.AdmissionName))
err := workload.UnsetAdmissionWithCondition(ctx, r.client, wl,
"Evicted", fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
Expand All @@ -200,7 +177,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
}
defer r.notifyWatchers(wl)
defer r.notifyWatchers(nil, wl)
status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
log.V(2).Info("Workload create event")
Expand Down Expand Up @@ -231,7 +208,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
}
defer r.notifyWatchers(wl)
defer r.notifyWatchers(wl, nil)
status := "unknown"
if !e.DeleteStateUnknown {
status = workloadStatus(wl)
Expand Down Expand Up @@ -272,8 +249,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
return true
}
wl := e.ObjectNew.(*kueue.Workload)
defer r.notifyWatchers(oldWl)
defer r.notifyWatchers(wl)
defer r.notifyWatchers(oldWl, wl)

status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
Expand Down Expand Up @@ -324,14 +300,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
if !r.cache.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("ClusterQueue for workload didn't exist; ignored for now")
}
case prevStatus == admitted && status == cancellingAdmission:
// The workload will be requeued when handling transitioning
// from cancellingAdmission to pending.
// When the workload is in the cancellingAdmission status, the only
// transition possible, triggered by the workload controller, is to the
// pending status. Scheduler is only able to re-admit the workload once
// requeued after reaching the pending status.
case prevStatus == cancellingAdmission && status == pending:
case prevStatus == admitted && status == pending:
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
Expand Down Expand Up @@ -361,9 +330,9 @@ func (r *WorkloadReconciler) Generic(e event.GenericEvent) bool {
return false
}

func (r *WorkloadReconciler) notifyWatchers(wl *kueue.Workload) {
func (r *WorkloadReconciler) notifyWatchers(oldWl, newWl *kueue.Workload) {
for _, w := range r.watchers {
w.NotifyWorkloadUpdate(wl)
w.NotifyWorkloadUpdate(oldWl, newWl)
}
}

Expand Down Expand Up @@ -419,11 +388,8 @@ func workloadStatus(w *kueue.Workload) string {
if apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished) {
return finished
}
if w.Status.Admission != nil {
return admitted
}
if apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted) {
return cancellingAdmission
return admitted
}
return pending
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ func (p *Preemptor) issuePreemptions(ctx context.Context, targets []*workload.In
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
err := p.applyPreemption(ctx, workload.BaseSSAWorkload(target.Obj))
patch := workload.BaseSSAWorkload(target.Obj)
patch.Status.Conditions = []metav1.Condition{{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: "Preempted",
Message: "Preempted to accommodate a higher priority Workload",
}}
err := p.applyPreemption(ctx, patch)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
Expand Down
Loading

0 comments on commit 5227f60

Please sign in to comment.