diff --git a/apis/apps/defaults/v1alpha1.go b/apis/apps/defaults/v1alpha1.go index ea0991c1d8..8673b62556 100644 --- a/apis/apps/defaults/v1alpha1.go +++ b/apis/apps/defaults/v1alpha1.go @@ -225,6 +225,25 @@ func SetDefaultsUnitedDeployment(obj *v1alpha1.UnitedDeployment, injectTemplateD } } } + + hasReplicasSettings := false + hasCapacitySettings := false + for _, subset := range obj.Spec.Topology.Subsets { + if subset.Replicas != nil { + hasReplicasSettings = true + } + if subset.MinReplicas != nil || subset.MaxReplicas != nil { + hasCapacitySettings = true + } + } + if hasCapacitySettings && !hasReplicasSettings { + for i := range obj.Spec.Topology.Subsets { + subset := &obj.Spec.Topology.Subsets[i] + if subset.MinReplicas == nil { + subset.MinReplicas = &intstr.IntOrString{Type: intstr.Int, IntVal: 0} + } + } + } } // SetDefaults_CloneSet set default values for CloneSet. diff --git a/apis/apps/v1alpha1/uniteddeployment_types.go b/apis/apps/v1alpha1/uniteddeployment_types.go index 809ba0c601..1b29eb6eae 100644 --- a/apis/apps/v1alpha1/uniteddeployment_types.go +++ b/apis/apps/v1alpha1/uniteddeployment_types.go @@ -196,7 +196,7 @@ type Subset struct { // Indicates the lower bounded replicas of the subset. // MinReplicas must be more than or equal to 0 if it is set. // Spec.Replicas must be more than or equal the sum of MinReplicas of all subsets. - // Defaults to nil. + // Defaults to 0. // +optional MinReplicas *intstr.IntOrString `json:"minReplicas,omitempty"` diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index 4df98b730d..b4851af839 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -992,7 +992,7 @@ spec: description: Indicates the lower bounded replicas of the subset. MinReplicas must be more than or equal to 0 if it is set. Spec.Replicas must be more than or equal the - sum of MinReplicas of all subsets. Defaults to nil. + sum of MinReplicas of all subsets. Defaults to 0. x-kubernetes-int-or-string: true name: description: Indicates subset name as a DNS_LABEL, which diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index 0c0abea2d9..3631022dfb 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -22,6 +22,7 @@ import ( "strings" "k8s.io/klog/v2" + "k8s.io/utils/integer" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) @@ -54,33 +55,50 @@ func (n subsetInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] } -// GetAllocatedReplicas returns a mapping from subset to next replicas. -// Next replicas is allocated by replicasAllocator, which will consider the current replicas of each subset and +type ReplicaAllocator interface { + Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) +} + +func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator { + for _, subset := range ud.Spec.Topology.Subsets { + if subset.MinReplicas != nil || subset.MaxReplicas != nil { + return &capacityAllocator{ud} + } + } + return &typicalAllocator{ud} +} + +type typicalAllocator struct { + *appsv1alpha1.UnitedDeployment +} + +// Alloc returns a mapping from subset to next replicas. +// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and // new replicas indicated from UnitedDeployment.Spec.Topology.Subsets. -func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) { - subsetInfos := getSubsetInfos(nameToSubset, ud) +func (ac *typicalAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) { + subsetInfos := getSubsetInfos(nameToSubset, ac.UnitedDeployment) var expectedReplicas int32 = -1 - if ud.Spec.Replicas != nil { - expectedReplicas = *ud.Spec.Replicas + if ac.Spec.Replicas != nil { + expectedReplicas = *ac.Spec.Replicas } - specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ud) - klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ud.Namespace, ud.Name, specifiedReplicas) + specifiedReplicas := getSpecifiedSubsetReplicas(expectedReplicas, ac.UnitedDeployment) + klog.V(4).Infof("UnitedDeployment %s/%s specifiedReplicas: %v", ac.Namespace, ac.Name, specifiedReplicas) // call SortToAllocator to sort all subset by subset.Replicas in order of increment return subsetInfos.SortToAllocator().AllocateReplicas(expectedReplicas, specifiedReplicas) } -func (n subsetInfos) SortToAllocator() *replicasAllocator { +func (n subsetInfos) SortToAllocator() *realReplicasAllocator { sort.Sort(n) - return &replicasAllocator{subsets: &n} + return &realReplicasAllocator{subsets: &n} } -type replicasAllocator struct { +type realReplicasAllocator struct { subsets *subsetInfos } -func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error { +func (s *realReplicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error { if subsetReplicasLimits == nil { return nil } @@ -150,7 +168,7 @@ func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDep // AllocateReplicas will first try to check the specifiedSubsetReplicas is valid or not. // If valid , normalAllocate will be called. It will apply these specified replicas, then average the rest replicas to left unspecified subsets. // If not, it will return error -func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) ( +func (s *realReplicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) ( *map[string]int32, error) { if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil { return nil, err @@ -159,7 +177,7 @@ func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetRepl return s.normalAllocate(replicas, specifiedSubsetReplicas), nil } -func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 { +func (s *realReplicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 { var specifiedReplicas int32 specifiedSubsetCount := 0 // Step 1: apply replicas to specified subsets, and mark them as specified = true. @@ -203,7 +221,7 @@ func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubs return s.toSubsetReplicaMap() } -func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 { +func (s *realReplicasAllocator) toSubsetReplicaMap() *map[string]int32 { allocatedReplicas := map[string]int32{} for _, subset := range *s.subsets { allocatedReplicas[subset.SubsetName] = subset.Replicas @@ -212,7 +230,7 @@ func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 { return &allocatedReplicas } -func (s *replicasAllocator) String() string { +func (s *realReplicasAllocator) String() string { result := "" sort.Sort(s.subsets) for _, subset := range *s.subsets { @@ -221,3 +239,71 @@ func (s *replicasAllocator) String() string { return result } + +type capacityAllocator struct { + *appsv1alpha1.UnitedDeployment +} + +// Alloc returns a mapping from subset to next replicas. +// Next replicas is allocated by realReplicasAllocator, which will consider the current replicas of each subset and +// new replicas indicated from UnitedDeployment.Spec.Topology.Subsets. +func (ac *capacityAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) { + replicas := int32(1) + if ac.Spec.Replicas != nil { + replicas = *ac.Spec.Replicas + } + + minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas) + if err != nil { + return nil, err + } + return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil +} + +func (ac *capacityAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) { + totalMin, totalMax := int64(0), int64(0) + numSubset := len(ac.Spec.Topology.Subsets) + minReplicasMap := make(map[string]int32, numSubset) + maxReplicasMap := make(map[string]int32, numSubset) + for index, subset := range ac.Spec.Topology.Subsets { + minReplicas := int32(0) + if subset.MinReplicas != nil { + minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas) + } + totalMin += int64(minReplicas) + minReplicasMap[subset.Name] = minReplicas + + maxReplicas := int32(1000000) + if subset.MaxReplicas != nil { + maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas) + } + totalMax += int64(maxReplicas) + maxReplicasMap[subset.Name] = maxReplicas + + if minReplicas > maxReplicas { + return nil, nil, fmt.Errorf("subset[%d].minReplicas must be more than or equal to maxReplicas", index) + } + } + if int64(replicas) < totalMin { + return nil, nil, fmt.Errorf("spec.replicas must be more than or equal to sum of minReplicas of all subsets") + } + return minReplicasMap, maxReplicasMap, nil +} + +func (ac *capacityAllocator) alloc(replicas int32, minReplicasMap, maxReplicasMap map[string]int32) *map[string]int32 { + allocated := int32(0) + for _, requiredAllocated := range minReplicasMap { + allocated += requiredAllocated + } + + subsetReplicas := make(map[string]int32, len(ac.Spec.Topology.Subsets)) + for _, subset := range ac.Spec.Topology.Subsets { + minReplicas := minReplicasMap[subset.Name] + maxReplicas := maxReplicasMap[subset.Name] + elasticAllocated := integer.Int32Min(maxReplicas-minReplicas, replicas-allocated) + elasticAllocated = integer.Int32Max(elasticAllocated, 0) + subsetReplicas[subset.Name] = minReplicas + elasticAllocated + allocated += elasticAllocated + } + return &subsetReplicas +} diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index d785450263..b5334c4606 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -213,7 +213,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci return reconcile.Result{}, err } - nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance) + nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset) klog.V(4).Infof("Get UnitedDeployment %s/%s next replicas %v", instance.Namespace, instance.Name, nextReplicas) if err != nil { klog.Errorf("UnitedDeployment %s/%s Specified subset replicas is ineffective: %s", diff --git a/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go b/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go index 2de34fd4a0..0f0b057414 100644 --- a/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go +++ b/pkg/webhook/uniteddeployment/validating/uniteddeployment_validation.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" apivalidation "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/utils/pointer" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" udctrl "github.com/openkruise/kruise/pkg/controller/uniteddeployment" @@ -62,14 +63,9 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa allErrs = append(allErrs, validateSubsetTemplate(&spec.Template, selector, fldPath.Child("template"))...) } - var sumReplicas int32 - var expectedReplicas int32 = -1 - if spec.Replicas != nil { - expectedReplicas = *spec.Replicas - } - count := 0 - subSetNames := sets.String{} + allErrs = append(allErrs, validateSubsetReplicas(spec.Replicas, spec.Topology.Subsets, fldPath.Child("topology", "subsets"))...) + subSetNames := sets.String{} for i, subset := range spec.Topology.Subsets { if len(subset.Name) == 0 { allErrs = append(allErrs, field.Required(fldPath.Child("topology", "subsets").Index(i).Child("name"), "")) @@ -108,39 +104,107 @@ func validateUnitedDeploymentSpec(spec *appsv1alpha1.UnitedDeploymentSpec, fldPa if subset.Replicas == nil { continue } + } - replicas, err := udctrl.ParseSubsetReplicas(expectedReplicas, *subset.Replicas) - if err != nil { - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets").Index(i).Child("replicas"), subset.Replicas, fmt.Sprintf("invalid replicas %s", subset.Replicas.String()))) - } else { - sumReplicas += replicas - count++ + if spec.UpdateStrategy.ManualUpdate != nil { + for subset := range spec.UpdateStrategy.ManualUpdate.Partitions { + if !subSetNames.Has(subset) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset))) + } } } - if expectedReplicas != -1 { - // sum of subset replicas may be less than uniteddployment replicas - if sumReplicas > expectedReplicas { - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, fmt.Sprintf("sum of indicated subset replicas %d should not be greater than UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) + return allErrs +} + +func validateSubsetReplicas(expectedReplicas *int32, subsets []appsv1alpha1.Subset, fldPath *field.Path) field.ErrorList { + var ( + sumReplicas = int64(0) + sumMinReplicas = int64(0) + sumMaxReplicas = int64(0) + + countReplicas = 0 + countMaxReplicas = 0 + + hasReplicasSettings = false + hasCapacitySettings = false + + err error + errList field.ErrorList + ) + + if expectedReplicas == nil { + expectedReplicas = pointer.Int32(-1) + } + + for i, subset := range subsets { + replicas := int32(0) + if subset.Replicas != nil { + countReplicas++ + hasReplicasSettings = true + replicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.Replicas) + if err != nil { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("replicas"), subset.Replicas, err.Error())) + } + } + sumReplicas += int64(replicas) + + minReplicas := int32(0) + if subset.MinReplicas != nil { + hasCapacitySettings = true + minReplicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.MinReplicas) + if err != nil { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, err.Error())) + } } + sumMinReplicas += int64(minReplicas) + + maxReplicas := int32(1000000) + if subset.MaxReplicas != nil { + countMaxReplicas++ + hasCapacitySettings = true + maxReplicas, err = udctrl.ParseSubsetReplicas(*expectedReplicas, *subset.MaxReplicas) + if err != nil { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, err.Error())) + } + } + sumMaxReplicas += int64(maxReplicas) - if count > 0 && count == len(spec.Topology.Subsets) && sumReplicas != expectedReplicas { - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) + if minReplicas > maxReplicas { + errList = append(errList, field.Invalid(fldPath.Index(i).Child("minReplicas"), subset.MaxReplicas, + fmt.Sprintf("subset[%d].minReplicas must be more than or equal to maxReplicas", i))) } - } else if count != len(spec.Topology.Subsets) { - // validate all of subsets replicas are not nil - allErrs = append(allErrs, field.Invalid(fldPath.Child("topology", "subsets"), sumReplicas, "if UnitedDeployment replicas is not provided, replicas of all subsets should be provided")) } - if spec.UpdateStrategy.ManualUpdate != nil { - for subset := range spec.UpdateStrategy.ManualUpdate.Partitions { - if !subSetNames.Has(subset) { - allErrs = append(allErrs, field.Invalid(fldPath.Child("updateStrategy", "partitions"), spec.UpdateStrategy.ManualUpdate.Partitions, fmt.Sprintf("subset %s does not exist", subset))) + if hasReplicasSettings && hasCapacitySettings { + errList = append(errList, field.Invalid(fldPath, subsets, "subset.Replicas and subset.MinReplicas/subset.MaxReplicas are mutually exclusive in a UnitedDeployment")) + return errList + } + + if hasCapacitySettings { + if *expectedReplicas == -1 { + errList = append(errList, field.Invalid(fldPath, expectedReplicas, "spec.replicas must be not empty if you set subset.minReplicas/maxReplicas")) + } else if int64(*expectedReplicas) < sumMinReplicas { + errList = append(errList, field.Invalid(fldPath, expectedReplicas, "spec.replicas must be more than or equal to sum of minReplicas of all subsets")) + } + if countMaxReplicas >= len(subsets) { + errList = append(errList, field.Invalid(fldPath, countMaxReplicas, "at least one subset.maxReplicas must be empty")) + } + } else { + if *expectedReplicas != -1 { + // sum of subset replicas may be less than uniteddployment replicas + if sumReplicas > int64(*expectedReplicas) { + errList = append(errList, field.Invalid(fldPath, sumReplicas, fmt.Sprintf("sum of indicated subset replicas %d should not be greater than UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) + } + if countReplicas > 0 && countReplicas == len(subsets) && sumReplicas != int64(*expectedReplicas) { + errList = append(errList, field.Invalid(fldPath, sumReplicas, fmt.Sprintf("if replicas of all subsets are provided, the sum of indicated subset replicas %d should equal UnitedDeployment replicas %d", sumReplicas, expectedReplicas))) } + } else if countReplicas != len(subsets) { + // validate all of subsets replicas are not nil + errList = append(errList, field.Invalid(fldPath, sumReplicas, "if UnitedDeployment replicas is not provided, replicas of all subsets should be provided")) } } - - return allErrs + return errList } // validateUnitedDeployment validates a UnitedDeployment.