Skip to content

Commit

Permalink
Optimize deprioritized policy preemption logic
Browse files Browse the repository at this point in the history
Signed-off-by: whitewindmills <[email protected]>
  • Loading branch information
whitewindmills committed Jan 22, 2024
1 parent 34c4d04 commit 9dc6be7
Showing 1 changed file with 73 additions and 9 deletions.
82 changes: 73 additions & 9 deletions pkg/detector/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package detector

import (
rbt "github.com/emirpasic/gods/trees/redblacktree"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -257,10 +258,10 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use red-black tree to sort the listed policies to ensure the
// higher priority PropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialKeys *rbt.Tree
for i := range policies {
var potentialPolicy policyv1alpha1.PropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
Expand All @@ -282,7 +283,32 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
if err != nil {
return
}

klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
if sortedPotentialKeys == nil {
sortedPotentialKeys = rbt.NewWith(int32DescendingComparator)
}

rawSamePriorityPolicyKeys, _ := sortedPotentialKeys.Get(potentialPolicy.ExplicitPriority())
var samePriorityPolicyKeys []util.QueueKey
if rawSamePriorityPolicyKeys != nil {
samePriorityPolicyKeys = rawSamePriorityPolicyKeys.([]util.QueueKey)
}

samePriorityPolicyKeys = append(samePriorityPolicyKeys, potentialKey)
sortedPotentialKeys.Put(potentialPolicy.ExplicitPriority(), samePriorityPolicyKeys)
}
}

// No suitable policy to re-queue.
if sortedPotentialKeys == nil {
return
}

itr := sortedPotentialKeys.Iterator()
for itr.Next() {
samePriorityPolicyKeys := itr.Value().([]util.QueueKey)
for _, potentialKey := range samePriorityPolicyKeys {
d.policyReconcileWorker.Add(potentialKey)
}
}
Expand All @@ -293,19 +319,18 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
// check if there is another ClusterPropagationPolicy could preempt the targeted resource,
// and put the ClusterPropagationPolicy in the queue to trigger preemption.
func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) {
klog.Infof("ClusterPropagationPolicy(%s/%s) priority changed from %d to %d",
newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)

policies, err := d.clusterPropagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
klog.Infof("ClusterPropagationPolicy(%s) priority changed from %d to %d",
newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
policies, err := d.clusterPropagationPolicyLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list ClusterPropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
klog.Errorf("Failed to list ClusterPropagationPolicy, error: %v", err)
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use red-black tree to sort the listed policies to ensure the
// higher priority ClusterPropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialKeys *rbt.Tree
for i := range policies {
var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
Expand All @@ -327,9 +352,48 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy
if err != nil {
return
}

klog.Infof("Enqueuing ClusterPropagationPolicy(%s/%s) in case of ClusterPropagationPolicy(%s/%s) priority changes",
potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
if sortedPotentialKeys == nil {
sortedPotentialKeys = rbt.NewWith(int32DescendingComparator)
}

rawSamePriorityPolicyKeys, _ := sortedPotentialKeys.Get(potentialPolicy.ExplicitPriority())
var samePriorityPolicyKeys []util.QueueKey
if rawSamePriorityPolicyKeys != nil {
samePriorityPolicyKeys = rawSamePriorityPolicyKeys.([]util.QueueKey)
}

samePriorityPolicyKeys = append(samePriorityPolicyKeys, potentialKey)
sortedPotentialKeys.Put(potentialPolicy.ExplicitPriority(), samePriorityPolicyKeys)
}
}

// No suitable policy to re-queue.
if sortedPotentialKeys == nil {
return
}

itr := sortedPotentialKeys.Iterator()
for itr.Next() {
samePriorityPolicyKeys := itr.Value().([]util.QueueKey)
for _, potentialKey := range samePriorityPolicyKeys {
d.clusterPolicyReconcileWorker.Add(potentialKey)
}
}
}

// int32DescendingComparator provides a basic descending comparison on int32.
func int32DescendingComparator(a, b interface{}) int {
aAsserted := a.(int32)
bAsserted := b.(int32)
switch {
case aAsserted > bAsserted:
return -1
case aAsserted < bAsserted:
return 1
default:
return 0
}
}

0 comments on commit 9dc6be7

Please sign in to comment.