From cf50f437c3edb41f49f3b9d8e57e63ec2d196f04 Mon Sep 17 00:00:00 2001 From: Abner-1 Date: Mon, 9 Sep 2024 17:42:02 +0800 Subject: [PATCH] support specified-delete in asts Signed-off-by: Abner-1 --- .../statefulset/stateful_set_control.go | 28 +++- .../statefulset/stateful_set_control_test.go | 130 ++++++++++++++++++ 2 files changed, 157 insertions(+), 1 deletion(-) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 8579b2feb6..c7fdab6df9 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -45,6 +45,7 @@ import ( imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction" "github.com/openkruise/kruise/pkg/util/inplaceupdate" "github.com/openkruise/kruise/pkg/util/lifecycle" + "github.com/openkruise/kruise/pkg/util/specifieddelete" ) // Realistic value for maximum in-flight requests when processing in parallel mode. @@ -622,11 +623,36 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods( } } + // handle specified deleted pod, and protected by maxUnavailable + for target := len(replicas) - 1; target >= 0; target-- { + if replicas[target] == nil { + continue + } + // the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it, + // why skip here rather than return? + // case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready + // when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't + // pod 2 and pod1 can be deleted because they were unavailable + if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) { + continue + } + if specifieddelete.IsSpecifiedDelete(replicas[target]) { + if _, err := ssc.deletePod(set, replicas[target]); err != nil { + return status, err + } + // mark target as unavailable because it's updated + unavailablePods.Insert(replicas[target].Name) + + if getPodRevision(replicas[target]) == currentRevision.Name { + status.CurrentReplicas-- + } + continue + } + } updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas) klog.V(3).InfoS("Prepare to update pods indexes for StatefulSet", "statefulSet", klog.KObj(set), "podIndexes", updateIndexes) // update pods in sequence for _, target := range updateIndexes { - // the target is already up-to-date, go to next if getPodRevision(replicas[target]) == updateRevision.Name { continue diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 01a2a4beba..8f3e34d77e 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -54,6 +55,7 @@ import ( utilpointer "k8s.io/utils/pointer" appspub "github.com/openkruise/kruise/apis/apps/pub" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" @@ -2215,6 +2217,119 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) { } } +func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) { + set := burst(newStatefulSet(6)) + var partition int32 = 3 + var maxUnavailable = intstr.FromInt(3) + set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy { + return &appsv1beta1.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + MaxUnavailable: &maxUnavailable, + PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType, + } + }(), + } + + client := fake.NewSimpleClientset() + kruiseClient := kruisefake.NewSimpleClientset(set) + spc, _, ssc, stop := setupController(client, kruiseClient) + defer close(stop) + if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { + t.Fatal(err) + } + set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatal(err) + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + + // set pod 0 to specified delete + originalPods, err := spc.setPodSpecifiedDelete(set, 0) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(originalPods)) + + // start to update + set.Spec.Template.Spec.Containers[0].Image = "foo" + + // first update pod 5 only because pod 0 is specified deleted + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + // inplace update 5 and create 0 + if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if len(pods) != 6 { + t.Fatalf("Expected create pods 5, got pods %v", pods) + } + sort.Sort(ascendingOrdinal(pods)) + _, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + // pod 0 is old image and pod 5/4 is new image + assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo") + assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo") + assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx") + + // set pod 1/2/5 to specified deleted and pod 0/4/5 to ready + spc.setPodSpecifiedDelete(set, 1) + spc.setPodSpecifiedDelete(set, 2) + for i := 0; i < 6; i++ { + spc.setPodRunning(set, i) + spc.setPodReady(set, i) + } + originalPods, _ = spc.setPodSpecifiedDelete(set, 5) + sort.Sort(ascendingOrdinal(originalPods)) + + // create new pod for 1/2/5, do not update 3 + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + // create new pods 5 and inplace update 3 + if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + if len(pods) != 6 { + t.Fatalf("Expected create pods 5, got pods %v", pods) + } + + _, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + _, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + _, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo") + assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx") + assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx") + assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx") +} + func TestStatefulSetControlInPlaceUpdate(t *testing.T) { set := burst(newStatefulSet(3)) var partition int32 = 1 @@ -3119,6 +3234,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi return om.podsLister.Pods(set.Namespace).List(selector) } +func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) { + pod := newStatefulSetPod(set, ordinal) + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true" + fakeResourceVersion(pod) + om.podsIndexer.Update(pod) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return nil, err + } + return om.podsLister.Pods(set.Namespace).List(selector) +} + var _ StatefulPodControlObjectManager = &fakeObjectManager{} type fakeStatefulSetStatusUpdater struct {