Skip to content

Commit

Permalink
optimize cloneset event handler codes (openkruise#1219)
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <[email protected]>
Co-authored-by: mingzhou.swx <[email protected]>
  • Loading branch information
2 people authored and diannaowa committed Jun 2, 2023
1 parent cd7df49 commit be2eeb6
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 78 deletions.
79 changes: 1 addition & 78 deletions pkg/controller/cloneset/cloneset_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package cloneset

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
Expand All @@ -31,21 +29,17 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/v1/pod"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
)

var (
Expand Down Expand Up @@ -179,59 +173,12 @@ func (e *podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimiting
}

func (e *podEventHandler) shouldIgnoreUpdate(req *reconcile.Request, oldPod, curPod *v1.Pod) bool {
if curPod.Generation != oldPod.Generation {
return false
}

cs := &appsv1alpha1.CloneSet{}
if err := e.Get(context.TODO(), req.NamespacedName, cs); err != nil {
return false
}

if lifecycleFinalizerChanged(cs, oldPod, curPod) {
return false
}

coreControl := clonesetcore.New(cs)
if coreControl.IsPodUpdatePaused(oldPod) != coreControl.IsPodUpdatePaused(curPod) {
return false
}

containsReadinessGate := func(pod *v1.Pod) bool {
for _, r := range pod.Spec.ReadinessGates {
if r.ConditionType == appspub.InPlaceUpdateReady {
return true
}
}
return false
}

if containsReadinessGate(curPod) {
opts := coreControl.GetUpdateOptions()
opts = inplaceupdate.SetOptionsDefaults(opts)
if err := containersUpdateCompleted(curPod, opts.CheckContainersUpdateCompleted); err == nil {
if cond := inplaceupdate.GetCondition(curPod); cond == nil || cond.Status != v1.ConditionTrue {
return false
}
}
}

if pod.IsPodReady(oldPod) != pod.IsPodReady(curPod) {
return false
}

return true
}

func containersUpdateCompleted(pod *v1.Pod, checkFunc func(pod *v1.Pod, state *appspub.InPlaceUpdateState) error) error {
if stateStr, ok := appspub.GetInPlaceUpdateState(pod); ok {
state := appspub.InPlaceUpdateState{}
if err := json.Unmarshal([]byte(stateStr), &state); err != nil {
return err
}
return checkFunc(pod, &state)
}
return fmt.Errorf("pod %v has no in-place update state annotation", klog.KObj(pod))
return clonesetcore.New(cs).IgnorePodUpdateEvent(oldPod, curPod)
}

func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
Expand Down Expand Up @@ -261,30 +208,6 @@ func (e *podEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimiti

}

func lifecycleFinalizerChanged(cs *appsv1alpha1.CloneSet, oldPod, curPod *v1.Pod) bool {
if cs.Spec.Lifecycle == nil {
return false
}

if cs.Spec.Lifecycle.PreDelete != nil {
for _, f := range cs.Spec.Lifecycle.PreDelete.FinalizersHandler {
if controllerutil.ContainsFinalizer(oldPod, f) != controllerutil.ContainsFinalizer(curPod, f) {
return true
}
}
}

if cs.Spec.Lifecycle.InPlaceUpdate != nil {
for _, f := range cs.Spec.Lifecycle.InPlaceUpdate.FinalizersHandler {
if controllerutil.ContainsFinalizer(oldPod, f) != controllerutil.ContainsFinalizer(curPod, f) {
return true
}
}
}

return false
}

func resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *reconcile.Request {
// Parse the Group out of the OwnerReference to compare it to what was parsed out of the requested OwnerType
refGV, err := schema.ParseGroupVersion(controllerRef.APIVersion)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/cloneset/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Control interface {

// validation
ValidateCloneSetUpdate(oldCS, newCS *appsv1alpha1.CloneSet) error

// event handler
IgnorePodUpdateEvent(oldPod, newPod *v1.Pod) bool
}

func New(cs *appsv1alpha1.CloneSet) Control {
Expand Down
78 changes: 78 additions & 0 deletions pkg/controller/cloneset/core/cloneset_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import (
"regexp"

"github.com/appscode/jsonpatch"
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubecontroller "k8s.io/kubernetes/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

var (
Expand Down Expand Up @@ -165,3 +169,77 @@ func (c *commonControl) ValidateCloneSetUpdate(oldCS, newCS *appsv1alpha1.CloneS
func (c *commonControl) ExtraStatusCalculation(status *appsv1alpha1.CloneSetStatus, pods []*v1.Pod) error {
return nil
}

func (c *commonControl) IgnorePodUpdateEvent(oldPod, curPod *v1.Pod) bool {
if oldPod.Generation != curPod.Generation {
return false
}

if lifecycleFinalizerChanged(c.CloneSet, oldPod, curPod) {
return false
}

if c.IsPodUpdatePaused(oldPod) != c.IsPodUpdatePaused(curPod) {
return false
}

if podutil.IsPodReady(oldPod) != podutil.IsPodReady(curPod) {
return false
}

containsReadinessGate := func(pod *v1.Pod) bool {
for _, r := range pod.Spec.ReadinessGates {
if r.ConditionType == appspub.InPlaceUpdateReady {
return true
}
}
return false
}

if containsReadinessGate(curPod) {
opts := c.GetUpdateOptions()
opts = inplaceupdate.SetOptionsDefaults(opts)
if err := containersUpdateCompleted(curPod, opts.CheckContainersUpdateCompleted); err == nil {
if cond := inplaceupdate.GetCondition(curPod); cond == nil || cond.Status != v1.ConditionTrue {
return false
}
}
}

return true
}

func containersUpdateCompleted(pod *v1.Pod, checkFunc func(pod *v1.Pod, state *appspub.InPlaceUpdateState) error) error {
if stateStr, ok := appspub.GetInPlaceUpdateState(pod); ok {
state := appspub.InPlaceUpdateState{}
if err := json.Unmarshal([]byte(stateStr), &state); err != nil {
return err
}
return checkFunc(pod, &state)
}
return fmt.Errorf("pod %v has no in-place update state annotation", klog.KObj(pod))
}

func lifecycleFinalizerChanged(cs *appsv1alpha1.CloneSet, oldPod, curPod *v1.Pod) bool {
if cs.Spec.Lifecycle == nil {
return false
}

if cs.Spec.Lifecycle.PreDelete != nil {
for _, f := range cs.Spec.Lifecycle.PreDelete.FinalizersHandler {
if controllerutil.ContainsFinalizer(oldPod, f) != controllerutil.ContainsFinalizer(curPod, f) {
return true
}
}
}

if cs.Spec.Lifecycle.InPlaceUpdate != nil {
for _, f := range cs.Spec.Lifecycle.InPlaceUpdate.FinalizersHandler {
if controllerutil.ContainsFinalizer(oldPod, f) != controllerutil.ContainsFinalizer(curPod, f) {
return true
}
}
}

return false
}

0 comments on commit be2eeb6

Please sign in to comment.