Skip to content

Commit

Permalink
Optimize deprioritized policy preemption logic
Browse files Browse the repository at this point in the history
Signed-off-by: whitewindmills <[email protected]>
  • Loading branch information
whitewindmills committed Feb 28, 2024
1 parent 34c4d04 commit efd56ec
Show file tree
Hide file tree
Showing 14 changed files with 1,127 additions and 26 deletions.
81 changes: 55 additions & 26 deletions pkg/detector/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package detector

import (
pq "github.com/emirpasic/gods/queues/priorityqueue"
godsutils "github.com/emirpasic/gods/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -257,13 +259,13 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use the priority queue to sort the listed policies to ensure the
// higher priority PropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialPolicies *pq.Queue
for i := range policies {
var potentialPolicy policyv1alpha1.PropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
var potentialPolicy *policyv1alpha1.PropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], potentialPolicy); err != nil {
klog.Errorf("Failed to convert typed PropagationPolicy: %v", err)
continue
}
Expand All @@ -277,38 +279,37 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
}
klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.policyReconcileWorker.Add(potentialKey)
if sortedPotentialPolicies == nil {
sortedPotentialPolicies = pq.NewWith(policyPriorityDescendingComparator)
}

sortedPotentialPolicies.Enqueue(potentialPolicy)
}
}
d.enqueuePotentialPolicies(sortedPotentialPolicies)
}

// HandleDeprioritizedClusterPropagationPolicy responses to priority change of a ClusterPropagationPolicy,
// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will
// check if there is another ClusterPropagationPolicy could preempt the targeted resource,
// and put the ClusterPropagationPolicy in the queue to trigger preemption.
func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) {
klog.Infof("ClusterPropagationPolicy(%s/%s) priority changed from %d to %d",
newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)

policies, err := d.clusterPropagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
klog.Infof("ClusterPropagationPolicy(%s) priority changed from %d to %d",
newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
policies, err := d.clusterPropagationPolicyLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list ClusterPropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
klog.Errorf("Failed to list ClusterPropagationPolicy, error: %v", err)
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use the priority queue to sort the listed policies to ensure the
// higher priority ClusterPropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialPolicies *pq.Queue
for i := range policies {
var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
var potentialPolicy *policyv1alpha1.ClusterPropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], potentialPolicy); err != nil {
klog.Errorf("Failed to convert typed ClusterPropagationPolicy: %v", err)
continue
}
Expand All @@ -322,14 +323,42 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
klog.Infof("Enqueuing ClusterPropagationPolicy(%s) in case of ClusterPropagationPolicy(%s) priority changes",
potentialPolicy.GetName(), newPolicy.GetName())
if sortedPotentialPolicies == nil {
sortedPotentialPolicies = pq.NewWith(policyPriorityDescendingComparator)
}
klog.Infof("Enqueuing ClusterPropagationPolicy(%s/%s) in case of ClusterPropagationPolicy(%s/%s) priority changes",
potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.clusterPolicyReconcileWorker.Add(potentialKey)

sortedPotentialPolicies.Enqueue(potentialPolicy)
}
}
d.enqueuePotentialPolicies(sortedPotentialPolicies)
}

// enqueuePotentialPolicies tries to enqueue potential policies.
func (d *ResourceDetector) enqueuePotentialPolicies(sortedPotentialPolicies *pq.Queue) {
// No suitable policy to re-queue.
if sortedPotentialPolicies == nil {
return
}

for {
potentialPolicy, ok := sortedPotentialPolicies.Dequeue()
if !ok {
break
}
potentialKey, err := ClusterWideKeyFunc(potentialPolicy)
if err != nil {
klog.Errorf("Failed to convert queued key: %v", err)
continue
}
d.policyReconcileWorker.Add(potentialKey)
}
}

// policyPriorityDescendingComparator provides a basic descending comparison on policy priority.
func policyPriorityDescendingComparator(a, b interface{}) int {
aPriority := a.(*policyv1alpha1.PropagationPolicy).ExplicitPriority()
bPriority := b.(*policyv1alpha1.PropagationPolicy).ExplicitPriority()
return -godsutils.Int32Comparator(aPriority, bPriority)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit efd56ec

Please sign in to comment.