Skip to content

Commit

Permalink
Merge pull request #4555 from whitewindmills/deprioritized-preemption
Browse files Browse the repository at this point in the history
Optimize deprioritized policy preemption logic
  • Loading branch information
karmada-bot authored Mar 1, 2024
2 parents a03aa84 + e88797b commit 2cfa7e6
Show file tree
Hide file tree
Showing 14 changed files with 1,141 additions and 19 deletions.
88 changes: 69 additions & 19 deletions pkg/detector/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package detector

import (
pq "github.com/emirpasic/gods/queues/priorityqueue"
godsutils "github.com/emirpasic/gods/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -33,6 +35,13 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
)

// PriorityKey is the unique propagation policy key with priority.
type PriorityKey struct {
util.QueueKey
// Priority is the priority of the propagation policy.
Priority int32
}

// preemptionEnabled checks if preemption is enabled.
func preemptionEnabled(preemption policyv1alpha1.PreemptionBehavior) bool {
if preemption != policyv1alpha1.PreemptAlways {
Expand Down Expand Up @@ -257,10 +266,10 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use the priority queue to sort the listed policies to ensure the
// higher priority PropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialKeys *pq.Queue
for i := range policies {
var potentialPolicy policyv1alpha1.PropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
Expand All @@ -277,35 +286,43 @@ func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policy
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
potentialKey, err := ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
klog.Errorf("Failed to convert PropagationPolicy to queued key: %v", err)
continue
}

klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.policyReconcileWorker.Add(potentialKey)
if sortedPotentialKeys == nil {
sortedPotentialKeys = pq.NewWith(priorityDescendingComparator)
}

sortedPotentialKeys.Enqueue(&PriorityKey{
QueueKey: potentialKey,
Priority: potentialPolicy.ExplicitPriority(),
})
}
}
requeuePotentialKeys(sortedPotentialKeys, d.policyReconcileWorker)
}

// HandleDeprioritizedClusterPropagationPolicy responses to priority change of a ClusterPropagationPolicy,
// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will
// check if there is another ClusterPropagationPolicy could preempt the targeted resource,
// and put the ClusterPropagationPolicy in the queue to trigger preemption.
func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy policyv1alpha1.ClusterPropagationPolicy, newPolicy policyv1alpha1.ClusterPropagationPolicy) {
klog.Infof("ClusterPropagationPolicy(%s/%s) priority changed from %d to %d",
newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)

policies, err := d.clusterPropagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
klog.Infof("ClusterPropagationPolicy(%s) priority changed from %d to %d",
newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
policies, err := d.clusterPropagationPolicyLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list ClusterPropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
klog.Errorf("Failed to list ClusterPropagationPolicy, error: %v", err)
return
}

// TODO(@RainbowMango): Should sort the listed policies to ensure the
// Use the priority queue to sort the listed policies to ensure the
// higher priority ClusterPropagationPolicy be process first to avoid possible
// multiple preemption.

var sortedPotentialKeys *pq.Queue
for i := range policies {
var potentialPolicy policyv1alpha1.ClusterPropagationPolicy
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
Expand All @@ -322,14 +339,47 @@ func (d *ResourceDetector) HandleDeprioritizedClusterPropagationPolicy(oldPolicy
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
var potentialKey util.QueueKey
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
potentialKey, err := ClusterWideKeyFunc(&potentialPolicy)
if err != nil {
return
klog.Errorf("Failed to convert ClusterPropagationPolicy to queued key: %v", err)
continue
}
klog.Infof("Enqueuing ClusterPropagationPolicy(%s/%s) in case of ClusterPropagationPolicy(%s/%s) priority changes",
potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
d.clusterPolicyReconcileWorker.Add(potentialKey)

klog.Infof("Enqueuing ClusterPropagationPolicy(%s) in case of ClusterPropagationPolicy(%s) priority changes",
potentialPolicy.GetName(), newPolicy.GetName())
if sortedPotentialKeys == nil {
sortedPotentialKeys = pq.NewWith(priorityDescendingComparator)
}

sortedPotentialKeys.Enqueue(&PriorityKey{
QueueKey: potentialKey,
Priority: potentialPolicy.ExplicitPriority(),
})
}
}
requeuePotentialKeys(sortedPotentialKeys, d.clusterPolicyReconcileWorker)
}

// requeuePotentialKeys re-queues potential policy keys.
func requeuePotentialKeys(sortedPotentialKeys *pq.Queue, worker util.AsyncWorker) {
// No suitable policy key to re-queue.
if sortedPotentialKeys == nil {
return
}

for {
key, ok := sortedPotentialKeys.Dequeue()
if !ok {
break
}

worker.Add(key.(*PriorityKey).QueueKey)
}
}

// priorityDescendingComparator provides a basic descending comparison on policy priority.
func priorityDescendingComparator(a, b interface{}) int {
aPriority := a.(*PriorityKey).Priority
bPriority := b.(*PriorityKey).Priority
return godsutils.Int32Comparator(bPriority, aPriority)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2cfa7e6

Please sign in to comment.