Skip to content

Commit

Permalink
Optimize deprioritized policy preemption logic
Browse files Browse the repository at this point in the history
Use the priorityequeue to sort the listed policies to avoid multiple preemption.

Signed-off-by: whitewindmills <[email protected]>
  • Loading branch information
whitewindmills committed Feb 29, 2024
1 parent 34c4d04 commit e88797b
Show file tree
Hide file tree
Showing 14 changed files with 1,141 additions and 19 deletions.
88 changes: 69 additions & 19 deletions pkg/detector/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package detector

import (
pq "github.com/emirpasic/gods/queues/priorityqueue"
godsutils "github.com/emirpasic/gods/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -33,6 +35,13 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
)

// PriorityKey is the unique propagation policy key with priority.
type PriorityKey struct {
util.QueueKey
// Priority is the priority of the propagation policy.
Priority int32
}

// preemptionEnabled checks if preemption is enabled.
func preemptionEnabled(preemption policyv1alpha1.PreemptionBehavior) bool {
if preemption != policyv1alpha1.PreemptAlways {
Expand Down Expand Up @@ -257,10 +266,10 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use the priority queue to sort the listed policies to ensure the
// higher priority PropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialKeys *pq.Queue
for i := range policies {
var potentialPolicy policyv1alpha1.PropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
Expand All @@ -277,35 +286,43 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
potentialKey, err := ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
klog.Errorf("Failed to convert PropagationPolicy to queued key: %v", err)
continue
}

klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.policyReconcileWorker.Add(potentialKey)
if sortedPotentialKeys == nil {
sortedPotentialKeys = pq.NewWith(priorityDescendingComparator)
}

sortedPotentialKeys.Enqueue(&PriorityKey{
QueueKey: potentialKey,
Priority: potentialPolicy.ExplicitPriority(),
})
}
}
requeuePotentialKeys(sortedPotentialKeys, d.policyReconcileWorker)
}

// HandleDeprioritizedClusterPropagationPolicy responses to priority change of a ClusterPropagationPolicy,
// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will
// check if there is another ClusterPropagationPolicy could preempt the targeted resource,
// and put the ClusterPropagationPolicy in the queue to trigger preemption.
func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) {
klog.Infof("ClusterPropagationPolicy(%s/%s) priority changed from %d to %d",
newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)

policies, err := d.clusterPropagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
klog.Infof("ClusterPropagationPolicy(%s) priority changed from %d to %d",
newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
policies, err := d.clusterPropagationPolicyLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list ClusterPropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
klog.Errorf("Failed to list ClusterPropagationPolicy, error: %v", err)
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use the priority queue to sort the listed policies to ensure the
// higher priority ClusterPropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialKeys *pq.Queue
for i := range policies {
var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
Expand All @@ -322,14 +339,47 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
potentialKey, err := ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
klog.Errorf("Failed to convert ClusterPropagationPolicy to queued key: %v", err)
continue
}
klog.Infof("Enqueuing ClusterPropagationPolicy(%s/%s) in case of ClusterPropagationPolicy(%s/%s) priority changes",
potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.clusterPolicyReconcileWorker.Add(potentialKey)

klog.Infof("Enqueuing ClusterPropagationPolicy(%s) in case of ClusterPropagationPolicy(%s) priority changes",
potentialPolicy.GetName(), newPolicy.GetName())
if sortedPotentialKeys == nil {
sortedPotentialKeys = pq.NewWith(priorityDescendingComparator)
}

sortedPotentialKeys.Enqueue(&PriorityKey{
QueueKey: potentialKey,
Priority: potentialPolicy.ExplicitPriority(),
})
}
}
requeuePotentialKeys(sortedPotentialKeys, d.clusterPolicyReconcileWorker)
}

// requeuePotentialKeys re-queues potential policy keys.
func requeuePotentialKeys(sortedPotentialKeys *pq.Queue, worker util.AsyncWorker) {
// No suitable policy key to re-queue.
if sortedPotentialKeys == nil {
return
}

for {
key, ok := sortedPotentialKeys.Dequeue()
if !ok {
break
}

worker.Add(key.(*PriorityKey).QueueKey)
}
}

// priorityDescendingComparator provides a basic descending comparison on policy priority.
func priorityDescendingComparator(a, b interface{}) int {
aPriority := a.(*PriorityKey).Priority
bPriority := b.(*PriorityKey).Priority
return godsutils.Int32Comparator(bPriority, aPriority)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e88797b

Please sign in to comment.