Skip to content

Commit

Permalink
Merge pull request #4219 from chaosi-zju/feat-random
Browse files Browse the repository at this point in the history
implementation : divide replicas by static weight evenly
  • Loading branch information
karmada-bot authored Nov 15, 2023
2 parents d2c0746 + 4241e88 commit 3dac48b
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/core/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func assignByStaticWeightStrategy(state *assignState) ([]workv1alpha2.TargetClus
if state.strategy.WeightPreference == nil {
state.strategy.WeightPreference = getDefaultWeightPreference(state.candidates)
}
weightList := getStaticWeightInfoList(state.candidates, state.strategy.WeightPreference.StaticWeightList)
weightList := getStaticWeightInfoList(state.candidates, state.strategy.WeightPreference.StaticWeightList, state.spec.Clusters)

disp := helper.NewDispenser(state.spec.Replicas, nil)
disp.TakeByWeight(weightList)
Expand Down
54 changes: 37 additions & 17 deletions pkg/scheduler/core/assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,9 @@ func Test_dynamicScaleUp(t *testing.T) {
candidates []*clusterv1alpha1.Cluster
object *workv1alpha2.ResourceBindingSpec
placement *policyv1alpha1.Placement
want []workv1alpha2.TargetCluster
wantErr bool
// wants specifies multi possible desired result, any one got is expected
wants [][]workv1alpha2.TargetCluster
wantErr bool
}{
{
name: "replica 12, dynamic weight 6:8:10",
Expand All @@ -616,10 +617,12 @@ func Test_dynamicScaleUp(t *testing.T) {
placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 3},
{Name: ClusterMember2, Replicas: 4},
{Name: ClusterMember3, Replicas: 5},
wants: [][]workv1alpha2.TargetCluster{
{
{Name: ClusterMember1, Replicas: 3},
{Name: ClusterMember2, Replicas: 4},
{Name: ClusterMember3, Replicas: 5},
},
},
wantErr: false,
},
Expand All @@ -645,10 +648,17 @@ func Test_dynamicScaleUp(t *testing.T) {
placement: &policyv1alpha1.Placement{
ReplicaScheduling: dynamicWeightStrategy,
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 3},
{Name: ClusterMember3, Replicas: 5},
wants: [][]workv1alpha2.TargetCluster{
{
{Name: ClusterMember1, Replicas: 4},
{Name: ClusterMember2, Replicas: 3},
{Name: ClusterMember3, Replicas: 5},
},
{
{Name: ClusterMember1, Replicas: 3},
{Name: ClusterMember2, Replicas: 4},
{Name: ClusterMember3, Replicas: 5},
},
},
wantErr: false,
},
Expand Down Expand Up @@ -698,9 +708,11 @@ func Test_dynamicScaleUp(t *testing.T) {
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember2, Replicas: 5},
{Name: ClusterMember3, Replicas: 7},
wants: [][]workv1alpha2.TargetCluster{
{
{Name: ClusterMember2, Replicas: 5},
{Name: ClusterMember3, Replicas: 7},
},
},
wantErr: false,
},
Expand All @@ -726,8 +738,10 @@ func Test_dynamicScaleUp(t *testing.T) {
placement: &policyv1alpha1.Placement{
ReplicaScheduling: aggregatedStrategy,
},
want: []workv1alpha2.TargetCluster{
{Name: ClusterMember1, Replicas: 12},
wants: [][]workv1alpha2.TargetCluster{
{
{Name: ClusterMember1, Replicas: 12},
},
},
wantErr: false,
},
Expand Down Expand Up @@ -765,9 +779,15 @@ func Test_dynamicScaleUp(t *testing.T) {
t.Errorf("dynamicScaleUp() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !helper.IsScheduleResultEqual(got, tt.want) {
t.Errorf("dynamicScaleUp() got = %v, want %v", got, tt.want)
if tt.wantErr {
return
}
for _, want := range tt.wants {
if helper.IsScheduleResultEqual(got, want) {
return
}
}
t.Errorf("dynamicScaleUp() got = %v, wants %v", got, tt.wants)
})
}
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/scheduler/core/division_algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ func (a TargetClustersList) Len() int { return len(a) }
func (a TargetClustersList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TargetClustersList) Less(i, j int) bool { return a[i].Replicas > a[j].Replicas }

func getStaticWeightInfoList(clusters []*clusterv1alpha1.Cluster, weightList []policyv1alpha1.StaticClusterWeight) helper.ClusterWeightInfoList {
func getStaticWeightInfoList(clusters []*clusterv1alpha1.Cluster, weightList []policyv1alpha1.StaticClusterWeight,
lastTargetClusters []workv1alpha2.TargetCluster) helper.ClusterWeightInfoList {
list := make(helper.ClusterWeightInfoList, 0)
for _, cluster := range clusters {
var weight int64
var lastReplicas int32
for _, staticWeightRule := range weightList {
if util.ClusterMatches(cluster, staticWeightRule.TargetCluster) {
weight = util.MaxInt64(weight, staticWeightRule.Weight)
}
}
for _, lastTargetCluster := range lastTargetClusters {
if cluster.Name == lastTargetCluster.Name {
lastReplicas = lastTargetCluster.Replicas
break
}
}
if weight > 0 {
list = append(list, helper.ClusterWeightInfo{
ClusterName: cluster.Name,
Weight: weight,
ClusterName: cluster.Name,
Weight: weight,
LastReplicas: lastReplicas,
})
}
}
Expand Down
50 changes: 30 additions & 20 deletions pkg/util/helper/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package helper

import (
"context"
"crypto/rand"
"math/big"
"sort"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -32,8 +34,9 @@ import (

// ClusterWeightInfo records the weight of a cluster
type ClusterWeightInfo struct {
ClusterName string
Weight int64
ClusterName string
Weight int64
LastReplicas int32
}

// ClusterWeightInfoList is a slice of ClusterWeightInfo that implements sort.Interface to sort by Value.
Expand All @@ -45,19 +48,18 @@ func (p ClusterWeightInfoList) Less(i, j int) bool {
if p[i].Weight != p[j].Weight {
return p[i].Weight > p[j].Weight
}
return p[i].ClusterName < p[j].ClusterName
}

// SortClusterByWeight sort clusters by the weight
func SortClusterByWeight(m map[string]int64) ClusterWeightInfoList {
p := make(ClusterWeightInfoList, len(m))
i := 0
for k, v := range m {
p[i] = ClusterWeightInfo{k, v}
i++
}
sort.Sort(p)
return p
// when weights is equal, sort by last scheduling replicas result,
// more last scheduling replicas means the remainders of the last scheduling were randomized to such clusters,
// so in order to keep the inertia in this scheduling, such clusters should also be prioritized
if p[i].LastReplicas != p[j].LastReplicas {
return p[i].LastReplicas > p[j].LastReplicas
}
// when last scheduling replicas is also equal, sort by random,
// first generate a random number within [0, 100) range,
// then return < if the actual number is in [0, 50) range, return > if is in [50, 100) range
const maxRandomNum = 100
randomNum, err := rand.Int(rand.Reader, big.NewInt(maxRandomNum))
return err == nil && randomNum.Cmp(big.NewInt(maxRandomNum/2)) >= 0
}

// GetWeightSum returns the sum of the weight info.
Expand Down Expand Up @@ -125,20 +127,28 @@ func (a *Dispenser) TakeByWeight(w ClusterWeightInfoList) {
}

// GetStaticWeightInfoListByTargetClusters constructs a weight list by target cluster slice.
func GetStaticWeightInfoListByTargetClusters(tcs []workv1alpha2.TargetCluster) ClusterWeightInfoList {
func GetStaticWeightInfoListByTargetClusters(tcs, scheduled []workv1alpha2.TargetCluster) ClusterWeightInfoList {
weightList := make(ClusterWeightInfoList, 0, len(tcs))
for _, result := range tcs {
for _, targetCluster := range tcs {
var lastReplicas int32
for _, scheduledCluster := range scheduled {
if targetCluster.Name == scheduledCluster.Name {
lastReplicas = scheduledCluster.Replicas
break
}
}
weightList = append(weightList, ClusterWeightInfo{
ClusterName: result.Name,
Weight: int64(result.Replicas),
ClusterName: targetCluster.Name,
Weight: int64(targetCluster.Replicas),
LastReplicas: lastReplicas,
})
}
return weightList
}

// SpreadReplicasByTargetClusters divides replicas by the weight of a target cluster list.
func SpreadReplicasByTargetClusters(numReplicas int32, tcs, init []workv1alpha2.TargetCluster) []workv1alpha2.TargetCluster {
weightList := GetStaticWeightInfoListByTargetClusters(tcs)
weightList := GetStaticWeightInfoListByTargetClusters(tcs, init)
disp := NewDispenser(numReplicas, init)
disp.TakeByWeight(weightList)
return disp.Result
Expand Down
Loading

0 comments on commit 3dac48b

Please sign in to comment.