Skip to content

Commit

Permalink
Make maxUnavailable also limited to pods in new revision
Browse files Browse the repository at this point in the history
Signed-off-by: FillZpp <[email protected]>
  • Loading branch information
FillZpp committed Jan 25, 2022
1 parent af3e254 commit afce790
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 56 deletions.
32 changes: 21 additions & 11 deletions pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
}

// 3. find all matched pods can update
targetRevision := updateRevision
if diffRes.updateNum < 0 {
targetRevision = currentRevision
}
var waitUpdateIndexes []int
for i, pod := range pods {
if coreControl.IsPodUpdatePaused(pod) {
Expand Down Expand Up @@ -115,7 +119,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
waitUpdateIndexes = SortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, pods, waitUpdateIndexes)

// 5. limit max count of pods can update
waitUpdateIndexes = limitUpdateIndexes(coreControl, cs.Spec.MinReadySeconds, diffRes, waitUpdateIndexes, pods)
waitUpdateIndexes = limitUpdateIndexes(coreControl, cs.Spec.MinReadySeconds, diffRes, waitUpdateIndexes, pods, targetRevision.Name)

// Determine the pub before updating the pod
var pub *policyv1alpha1.PodUnavailableBudget
Expand All @@ -139,10 +143,6 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
return time.Second, nil
}
}
targetRevision := updateRevision
if diffRes.updateNum < 0 {
targetRevision = currentRevision
}
duration, err := c.updatePod(cs, coreControl, targetRevision, revisions, pod, pvcs)
if duration > 0 {
requeueDuration.Update(duration)
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc
return 0, nil
}

// SortUpdateIndexes sorts the given waitUpdateIndexes of Pods to update according to the CloneSet strategy.
// SortUpdateIndexes sorts the given oldRevisionIndexes of Pods to update according to the CloneSet strategy.
func SortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, pods []*v1.Pod, waitUpdateIndexes []int) []int {
// Sort Pods with default sequence
sort.Slice(waitUpdateIndexes, coreControl.GetPodsSortFunc(pods, waitUpdateIndexes))
Expand All @@ -308,24 +308,34 @@ func SortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.C
}

// limitUpdateIndexes limits all pods waiting update by the maxUnavailable policy, and returns the indexes of pods that can finally update
func limitUpdateIndexes(coreControl clonesetcore.Control, minReadySeconds int32, diffRes expectationDiffs, waitUpdateIndexes []int, pods []*v1.Pod) []int {
func limitUpdateIndexes(coreControl clonesetcore.Control, minReadySeconds int32, diffRes expectationDiffs, waitUpdateIndexes []int, pods []*v1.Pod, targetRevisionHash string) []int {
updateDiff := util.IntAbs(diffRes.updateNum)
if updateDiff < len(waitUpdateIndexes) {
waitUpdateIndexes = waitUpdateIndexes[:updateDiff]
}

var notReadyCount, canUpdateCount int
var unavailableCount, targetRevisionUnavailableCount, canUpdateCount int
for _, p := range pods {
if !isPodAvailable(coreControl, p, minReadySeconds) {
notReadyCount++
unavailableCount++
if clonesetutils.EqualToRevisionHash("", p, targetRevisionHash) {
targetRevisionUnavailableCount++
}
}
}
for _, i := range waitUpdateIndexes {
// Make sure unavailable pods in target revision should not be more than maxUnavailable.
if targetRevisionUnavailableCount+canUpdateCount >= diffRes.updateMaxUnavailable {
break
}

// Make sure unavailable pods in all revisions should not be more than maxUnavailable.
// Note that update an old pod that already be unavailable will not increase the unavailable number.
if isPodAvailable(coreControl, pods[i], minReadySeconds) {
if notReadyCount >= diffRes.updateMaxUnavailable {
if unavailableCount >= diffRes.updateMaxUnavailable {
break
}
notReadyCount++
unavailableCount++
}
canUpdateCount++
}
Expand Down
202 changes: 157 additions & 45 deletions pkg/controller/cloneset/sync/cloneset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sync

import (
"context"
"fmt"
"reflect"
"testing"
"time"
Expand All @@ -27,8 +28,10 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -645,98 +648,207 @@ func TestSortUpdateIndexes(t *testing.T) {
}

func TestCalculateUpdateCount(t *testing.T) {
// Enable the CloneSetPartitionRollback feature-gate
_ = utilfeature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=true", features.CloneSetPartitionRollback))

readyPod := func() *v1.Pod {
return &v1.Pod{Status: v1.PodStatus{Phase: v1.PodRunning, Conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}}}
}
cases := []struct {
strategy appsv1alpha1.CloneSetUpdateStrategy
totalReplicas int
waitUpdateIndexes []int
pods []*v1.Pod
expectedResult int
strategy appsv1alpha1.CloneSetUpdateStrategy
totalReplicas int
oldRevisionIndexes []int
pods []*v1.Pod
expectedResult int
}{
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 3,
waitUpdateIndexes: []int{0, 1, 2},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod()},
expectedResult: 1,
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 3,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod()},
expectedResult: 1,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 3,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{readyPod(), {}, readyPod()},
expectedResult: 0,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 3,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{{}, readyPod(), readyPod()},
expectedResult: 1,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 10,
oldRevisionIndexes: []int{0, 1, 2, 3, 4, 5, 6, 7, 8},
pods: []*v1.Pod{{}, readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), {}, readyPod()},
expectedResult: 1,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{Partition: util.GetIntOrStrPointer(intstrutil.FromInt(2)), MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(3))},
totalReplicas: 3,
oldRevisionIndexes: []int{0, 1},
pods: []*v1.Pod{{}, readyPod(), readyPod()},
expectedResult: 0,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{Partition: util.GetIntOrStrPointer(intstrutil.FromInt(2)), MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromString("50%"))},
totalReplicas: 8,
oldRevisionIndexes: []int{0, 1, 2, 3, 4, 5, 6},
pods: []*v1.Pod{{}, readyPod(), {}, readyPod(), readyPod(), readyPod(), readyPod(), {}},
expectedResult: 3,
},
{
// old revision all unavailable, partition = 0, maxUnavailable = 2, should only update 2 pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2, 3, 4},
pods: []*v1.Pod{{}, {}, {}, {}, {}},
expectedResult: 2,
},
{
// old revision all unavailable, partition = 0, maxUnavailable = 2, 2 updating, should not update pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{{}, {}, {}, {}, {}},
expectedResult: 0,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 3,
waitUpdateIndexes: []int{0, 1, 2},
pods: []*v1.Pod{readyPod(), {}, readyPod()},
expectedResult: 0,
// old revision all unavailable, partition = 0, maxUnavailable = 2, 1 updated and 1 updating, should only update 1 pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{{}, {}, {}, readyPod(), {}},
expectedResult: 1,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 3,
waitUpdateIndexes: []int{0, 1, 2},
pods: []*v1.Pod{{}, readyPod(), readyPod()},
expectedResult: 1,
// old revision all unavailable, partition = 0, maxUnavailable = 2, maxSurge = 1, 1 creating, should only update 2 pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
MaxSurge: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(1)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2, 3, 4},
pods: []*v1.Pod{{}, {}, {}, {}, {}, {}},
expectedResult: 2,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{},
totalReplicas: 10,
waitUpdateIndexes: []int{0, 1, 2, 3, 4, 5, 6, 7, 8},
pods: []*v1.Pod{{}, readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), {}, readyPod()},
expectedResult: 1,
// old revision all unavailable, partition = 0, maxUnavailable = 2, maxSurge = 1, 1 updated and 1 updating, should only update 2 pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
MaxSurge: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(1)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2, 3},
pods: []*v1.Pod{{}, {}, {}, {}, readyPod(), {}},
expectedResult: 2,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{Partition: util.GetIntOrStrPointer(intstrutil.FromInt(2)), MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(3))},
totalReplicas: 3,
waitUpdateIndexes: []int{0, 1},
pods: []*v1.Pod{{}, readyPod(), readyPod()},
expectedResult: 0,
// old revision all unavailable, partition = 0, maxUnavailable = 2, maxSurge = 1, 1 updated and 2 updating, should only update 1 pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
MaxSurge: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(1)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{{}, {}, {}, {}, readyPod(), {}},
expectedResult: 1,
},
{
// old revision all unavailable, partition = 0, maxUnavailable = 2, maxSurge = 1, 3 updating, should not update pods
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(0)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
MaxSurge: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(1)),
},
totalReplicas: 5,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{{}, {}, {}, {}, {}, {}},
expectedResult: 0,
},
{
strategy: appsv1alpha1.CloneSetUpdateStrategy{Partition: util.GetIntOrStrPointer(intstrutil.FromInt(2)), MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromString("50%"))},
totalReplicas: 8,
waitUpdateIndexes: []int{0, 1, 2, 3, 4, 5, 6},
pods: []*v1.Pod{{}, readyPod(), {}, readyPod(), readyPod(), readyPod(), readyPod(), {}},
expectedResult: 3,
// rollback with maxUnavailable and pods in new revision are unavailable
strategy: appsv1alpha1.CloneSetUpdateStrategy{
Partition: util.GetIntOrStrPointer(intstrutil.FromInt(7)),
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
},
totalReplicas: 8,
oldRevisionIndexes: []int{0, 1, 2},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod(), {}, {}, {}, {}, {}},
expectedResult: 2,
},
{
// maxUnavailable = 0 and maxSurge = 2, usedSurge = 1
strategy: appsv1alpha1.CloneSetUpdateStrategy{
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(0)),
MaxSurge: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
},
totalReplicas: 4,
waitUpdateIndexes: []int{0, 1},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod(), readyPod(), readyPod()},
expectedResult: 1,
totalReplicas: 4,
oldRevisionIndexes: []int{0, 1},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod(), readyPod(), readyPod()},
expectedResult: 1,
},
{
// maxUnavailable = 0 and maxSurge = 2, usedSurge = 2
strategy: appsv1alpha1.CloneSetUpdateStrategy{
MaxUnavailable: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(0)),
MaxSurge: intstrutil.ValueOrDefault(nil, intstrutil.FromInt(2)),
},
totalReplicas: 4,
waitUpdateIndexes: []int{0, 1, 2, 3},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), readyPod()},
expectedResult: 2,
totalReplicas: 4,
oldRevisionIndexes: []int{0, 1, 2, 3},
pods: []*v1.Pod{readyPod(), readyPod(), readyPod(), readyPod(), readyPod(), readyPod()},
expectedResult: 2,
},
}

coreControl := clonesetcore.New(&appsv1alpha1.CloneSet{})
for i, tc := range cases {
currentRevision := "current"
updateRevision := "updated"
indexes := sets.NewInt(tc.waitUpdateIndexes...)
indexes := sets.NewInt(tc.oldRevisionIndexes...)
var newRevisionIndexes []int
for i, pod := range tc.pods {
if !indexes.Has(i) {
newRevisionIndexes = append(newRevisionIndexes, i)
pod.Labels = map[string]string{apps.ControllerRevisionHashLabelKey: updateRevision}
} else {
pod.Labels = map[string]string{apps.ControllerRevisionHashLabelKey: currentRevision}
}
}

replicas := int32(tc.totalReplicas)
cs := &appsv1alpha1.CloneSet{Spec: appsv1alpha1.CloneSetSpec{Replicas: &replicas, UpdateStrategy: tc.strategy}}
diffRes := calculateDiffsWithExpectation(cs, tc.pods, currentRevision, updateRevision)

res := limitUpdateIndexes(coreControl, 0, diffRes, tc.waitUpdateIndexes, tc.pods)
var waitUpdateIndexes []int
var targetRevision string
if diffRes.updateNum > 0 {
waitUpdateIndexes = tc.oldRevisionIndexes
targetRevision = updateRevision
} else if diffRes.updateNum < 0 {
waitUpdateIndexes = newRevisionIndexes
targetRevision = currentRevision
}

res := limitUpdateIndexes(coreControl, 0, diffRes, waitUpdateIndexes, tc.pods, targetRevision)
if len(res) != tc.expectedResult {
t.Fatalf("case #%d failed, expected %d, got %d", i, tc.expectedResult, res)
}
Expand Down

0 comments on commit afce790

Please sign in to comment.