Skip to content

Commit

Permalink
add subset capacity planning for UnitiedDeployment
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <[email protected]>
  • Loading branch information
mingzhou.swx committed Sep 26, 2023
1 parent 024644c commit c8ab96d
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 47 deletions.
19 changes: 19 additions & 0 deletions apis/apps/defaults/v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,25 @@ func SetDefaultsUnitedDeployment(obj *v1alpha1.UnitedDeployment, injectTemplateD
}
}
}

hasReplicasSettings := false
hasCapacitySettings := false
for _, subset := range obj.Spec.Topology.Subsets {
if subset.Replicas != nil {
hasReplicasSettings = true
}
if subset.MinReplicas != nil || subset.MaxReplicas != nil {
hasCapacitySettings = true
}
}
if hasCapacitySettings && !hasReplicasSettings {
for i := range obj.Spec.Topology.Subsets {
subset := &obj.Spec.Topology.Subsets[i]
if subset.MinReplicas == nil {
subset.MinReplicas = &intstr.IntOrString{Type: intstr.Int, IntVal: 0}
}
}
}
}

// SetDefaults_CloneSet set default values for CloneSet.
Expand Down
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/uniteddeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ type Subset struct {
// Indicates the lower bounded replicas of the subset.
// MinReplicas must be more than or equal to 0 if it is set.
// Spec.Replicas must be more than or equal the sum of MinReplicas of all subsets.
// Defaults to nil.
// Defaults to 0.
// +optional
MinReplicas *intstr.IntOrString `json:"minReplicas,omitempty"`

Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ spec:
description: Indicates the lower bounded replicas of the
subset. MinReplicas must be more than or equal to 0 if
it is set. Spec.Replicas must be more than or equal the
sum of MinReplicas of all subsets. Defaults to nil.
sum of MinReplicas of all subsets. Defaults to 0.
x-kubernetes-int-or-string: true
name:
description: Indicates subset name as a DNS_LABEL, which
Expand Down
118 changes: 102 additions & 16 deletions pkg/controller/uniteddeployment/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"k8s.io/klog/v2"
"k8s.io/utils/integer"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
)
Expand Down Expand Up @@ -54,33 +55,50 @@ func (n subsetInfos) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}

// GetAllocatedReplicas returns a mapping from subset to next replicas.
// Next replicas is allocated by replicasAllocator, which will consider the current replicas of each subset and
type ReplicaAllocator interface {
Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error)
}

func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator {
for _, subset := range ud.Spec.Topology.Subsets {
if subset.MinReplicas != nil || subset.MaxReplicas != nil {
return &capacityAllocator{ud}
}
}
return &typicalAllocator{ud}
}

type typicalAllocator struct {
*appsv1alpha1.UnitedDeployment
}

// Alloc returns a mapping from subset to next replicas.
// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and
// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets.
func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) {
subsetInfos := getSubsetInfos(nameToSubset, ud)
func (ac *typicalAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) {
subsetInfos := getSubsetInfos(nameToSubset, ac.UnitedDeployment)

var expectedReplicas int32 = -1
if ud.Spec.Replicas != nil {
expectedReplicas = *ud.Spec.Replicas
if ac.Spec.Replicas != nil {
expectedReplicas = *ac.Spec.Replicas
}

specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ud)
klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ud.Namespace, ud.Name, specifiedReplicas)
specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ac.UnitedDeployment)
klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ac.Namespace, ac.Name, specifiedReplicas)
// call SortToAllocator to sort all subset by subset.Replicas in order of increment
return subsetInfos.SortToAllocator().AllocateReplicas(expectedReplicas, specifiedReplicas)
}

func (n subsetInfos) SortToAllocator() *replicasAllocator {
func (n subsetInfos) SortToAllocator() *realReplicasAllocator {
sort.Sort(n)
return &replicasAllocator{subsets: &n}
return &realReplicasAllocator{subsets: &n}
}

type replicasAllocator struct {
type realReplicasAllocator struct {
subsets *subsetInfos
}

func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {
func (s *realReplicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {
if subsetReplicasLimits == nil {
return nil
}
Expand Down Expand Up @@ -150,7 +168,7 @@ func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDep
// AllocateReplicas will first try to check the specifiedSubsetReplicas is valid or not.
// If valid , normalAllocate will be called. It will apply these specified replicas, then average the rest replicas to left unspecified subsets.
// If not, it will return error
func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (
func (s *realReplicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (
*map[string]int32, error) {
if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil {
return nil, err
Expand All @@ -159,7 +177,7 @@ func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetRepl
return s.normalAllocate(replicas, specifiedSubsetReplicas), nil
}

func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
func (s *realReplicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
var specifiedReplicas int32
specifiedSubsetCount := 0
// Step 1: apply replicas to specified subsets, and mark them as specified = true.
Expand Down Expand Up @@ -203,7 +221,7 @@ func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubs
return s.toSubsetReplicaMap()
}

func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
func (s *realReplicasAllocator) toSubsetReplicaMap() *map[string]int32 {
allocatedReplicas := map[string]int32{}
for _, subset := range *s.subsets {
allocatedReplicas[subset.SubsetName] = subset.Replicas
Expand All @@ -212,7 +230,7 @@ func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
return &allocatedReplicas
}

func (s *replicasAllocator) String() string {
func (s *realReplicasAllocator) String() string {
result := ""
sort.Sort(s.subsets)
for _, subset := range *s.subsets {
Expand All @@ -221,3 +239,71 @@ func (s *replicasAllocator) String() string {

return result
}

type capacityAllocator struct {
*appsv1alpha1.UnitedDeployment
}

// Alloc returns a mapping from subset to next replicas.
// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and
// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets.
func (ac *capacityAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) {
replicas := int32(1)
if ac.Spec.Replicas != nil {
replicas = *ac.Spec.Replicas
}

minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas)
if err != nil {
return nil, err
}
return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil
}

func (ac *capacityAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) {
totalMin, totalMax := int64(0), int64(0)
numSubset := len(ac.Spec.Topology.Subsets)
minReplicasMap := make(map[string]int32, numSubset)
maxReplicasMap := make(map[string]int32, numSubset)
for index, subset := range ac.Spec.Topology.Subsets {
minReplicas := int32(0)
if subset.MinReplicas != nil {
minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas)
}
totalMin += int64(minReplicas)
minReplicasMap[subset.Name] = minReplicas

maxReplicas := int32(1000000)
if subset.MaxReplicas != nil {
maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas)
}
totalMax += int64(maxReplicas)
maxReplicasMap[subset.Name] = maxReplicas

if minReplicas > maxReplicas {
return nil, nil, fmt.Errorf("subset[%d].minReplicas must be more than or equal to maxReplicas", index)
}
}
if int64(replicas) < totalMin {
return nil, nil, fmt.Errorf("spec.replicas must be more than or equal to sum of minReplicas of all subsets")
}
return minReplicasMap, maxReplicasMap, nil
}

func (ac *capacityAllocator) alloc(replicas int32, minReplicasMap, maxReplicasMap map[string]int32) *map[string]int32 {
allocated := int32(0)
for _, requiredAllocated := range minReplicasMap {
allocated += requiredAllocated
}

subsetReplicas := make(map[string]int32, len(ac.Spec.Topology.Subsets))
for _, subset := range ac.Spec.Topology.Subsets {
minReplicas := minReplicasMap[subset.Name]
maxReplicas := maxReplicasMap[subset.Name]
elasticAllocated := integer.Int32Min(maxReplicas-minReplicas, replicas-allocated)
elasticAllocated = integer.Int32Max(elasticAllocated, 0)
subsetReplicas[subset.Name] = minReplicas + elasticAllocated
allocated += elasticAllocated
}
return &subsetReplicas
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
return reconcile.Result{}, err
}

nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance)
nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset)
klog.V(4).Infof("Get UnitedDeployment %s/%s next replicas %v", instance.Namespace, instance.Name, nextReplicas)
if err != nil {
klog.Errorf("UnitedDeployment %s/%s Specified subset replicas is ineffective: %s",
Expand Down
120 changes: 92 additions & 28 deletions pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/utils/pointer"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
udctrl "github.com/openkruise/kruise/pkg/controller/uniteddeployment"
Expand Down Expand Up @@ -62,14 +63,9 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa
allErrs = append(allErrs, validateSubsetTemplate(&spec.Template, selector, fldPath.Child("template"))...)
}

var sumReplicas int32
var expectedReplicas int32 = -1
if spec.Replicas != nil {
expectedReplicas = *spec.Replicas
}
count := 0
subSetNames := sets.String{}
allErrs = append(allErrs, validateSubsetReplicas(spec.Replicas, spec.Topology.Subsets, fldPath.Child("topology", "subsets"))...)

subSetNames := sets.String{}
for i, subset := range spec.Topology.Subsets {
if len(subset.Name) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("topology", "subsets").Index(i).Child("name"), ""))
Expand Down Expand Up @@ -108,39 +104,107 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa
if subset.Replicas == nil {
continue
}
}

replicas, err := udctrl.ParseSubsetReplicas(expectedReplicas, *subset.Replicas)
if err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets").Index(i).Child("replicas"), subset.Replicas, fmt.Sprintf("invalid replicas %s", subset.Replicas.String())))
} else {
sumReplicas += replicas
count++
if spec.UpdateStrategy.ManualUpdate != nil {
for subset := range spec.UpdateStrategy.ManualUpdate.Partitions {
if !subSetNames.Has(subset) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset)))
}
}
}

if expectedReplicas != -1 {
// sum of subset replicas may be less than uniteddployment replicas
if sumReplicas > expectedReplicas {
allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, fmt.Sprintf("sum of indicated subset replicas %d should not be greater than UnitedDeployment replicas %d", sumReplicas, expectedReplicas)))
return allErrs
}

func validateSubsetReplicas(expectedReplicas *int32, subsets []appsv1alpha1.Subset, fldPath *field.Path) field.ErrorList {
var (
sumReplicas = int64(0)
sumMinReplicas = int64(0)
sumMaxReplicas = int64(0)

countReplicas = 0
countMaxReplicas = 0

hasReplicasSettings = false
hasCapacitySettings = false

err error
errList field.ErrorList
)

if expectedReplicas == nil {
expectedReplicas = pointer.Int32(-1)
}

for i, subset := range subsets {
replicas := int32(0)
if subset.Replicas != nil {
countReplicas++
hasReplicasSettings = true
replicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.Replicas)
if err != nil {
errList = append(errList, field.Invalid(fldPath.Index(i).Child("replicas"), subset.Replicas, err.Error()))
}
}
sumReplicas += int64(replicas)

minReplicas := int32(0)
if subset.MinReplicas != nil {
hasCapacitySettings = true
minReplicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.MinReplicas)
if err != nil {
errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, err.Error()))
}
}
sumMinReplicas += int64(minReplicas)

maxReplicas := int32(1000000)
if subset.MaxReplicas != nil {
countMaxReplicas++
hasCapacitySettings = true
maxReplicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.MaxReplicas)
if err != nil {
errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, err.Error()))
}
}
sumMaxReplicas += int64(maxReplicas)

if count > 0 && count == len(spec.Topology.Subsets) && sumReplicas != expectedReplicas {
allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas)))
if minReplicas > maxReplicas {
errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas,
fmt.Sprintf("subset[%d].minReplicas must be more than or equal to maxReplicas", i)))
}
} else if count != len(spec.Topology.Subsets) {
// validate all of subsets replicas are not nil
allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, "if UnitedDeployment replicas is not provided, replicas of all subsets should be provided"))
}

if spec.UpdateStrategy.ManualUpdate != nil {
for subset := range spec.UpdateStrategy.ManualUpdate.Partitions {
if !subSetNames.Has(subset) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset)))
if hasReplicasSettings && hasCapacitySettings {
errList = append(errList, field.Invalid(fldPath, subsets, "subset.Replicas and subset.MinReplicas/subset.MaxReplicas are mutually exclusive in a UnitedDeployment"))
return errList
}

if hasCapacitySettings {
if *expectedReplicas == -1 {
errList = append(errList, field.Invalid(fldPath, expectedReplicas, "spec.replicas must be not empty if you set subset.minReplicas/maxReplicas"))
} else if int64(*expectedReplicas) < sumMinReplicas {
errList = append(errList, field.Invalid(fldPath, expectedReplicas, "spec.replicas must be more than or equal to sum of minReplicas of all subsets"))
}
if countMaxReplicas >= len(subsets) {
errList = append(errList, field.Invalid(fldPath, countMaxReplicas, "at least one subset.maxReplicas must be empty"))
}
} else {
if *expectedReplicas != -1 {
// sum of subset replicas may be less than uniteddployment replicas
if sumReplicas > int64(*expectedReplicas) {
errList = append(errList, field.Invalid(fldPath, sumReplicas, fmt.Sprintf("sum of indicated subset replicas %d should not be greater than UnitedDeployment replicas %d", sumReplicas, expectedReplicas)))
}
if countReplicas > 0 && countReplicas == len(subsets) && sumReplicas != int64(*expectedReplicas) {
errList = append(errList, field.Invalid(fldPath, sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas)))
}
} else if countReplicas != len(subsets) {
// validate all of subsets replicas are not nil
errList = append(errList, field.Invalid(fldPath, sumReplicas, "if UnitedDeployment replicas is not provided, replicas of all subsets should be provided"))
}
}

return allErrs
return errList
}

// validateUnitedDeployment validates a UnitedDeployment.
Expand Down

0 comments on commit c8ab96d

Please sign in to comment.