Skip to content

Commit

Permalink
Sidecar terminator ignore the exit code of the sidecar container
Browse files Browse the repository at this point in the history
Signed-off-by: liuzhenwei <[email protected]>

add ut

Signed-off-by: liuzhenwei <[email protected]>

add crr event handler ut

Signed-off-by: liuzhenwei <[email protected]>

fix crr status

Signed-off-by: liuzhenwei <[email protected]>

fix, support kubelet and crr controller report pod status

Signed-off-by: liuzhenwei <[email protected]>

add some comments and simplified some code

Signed-off-by: liuzhenwei <[email protected]>
  • Loading branch information
diannaowa committed Jun 6, 2023
1 parent 40e62c6 commit a1e7bd6
Show file tree
Hide file tree
Showing 4 changed files with 390 additions and 28 deletions.
168 changes: 156 additions & 12 deletions pkg/controller/sidecarterminator/sidecar_terminator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,37 @@ import (
"strings"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"k8s.io/apimachinery/pkg/types"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
)

func init() {
flag.IntVar(&concurrentReconciles, "sidecarterminator-workers", concurrentReconciles, "Max concurrent workers for SidecarTerminator controller.")
}

var (
concurrentReconciles = 3
concurrentReconciles = 3
sidecarTerminated corev1.PodConditionType = "SidecarTerminated"
)

/**
Expand All @@ -70,6 +77,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: cli,
recorder: recorder,
scheme: mgr.GetScheme(),
clock: clock.RealClock{},
}
}

Expand Down Expand Up @@ -99,6 +107,7 @@ type ReconcileSidecarTerminator struct {
client.Client
recorder record.EventRecorder
scheme *runtime.Scheme
clock clock.Clock
}

// Reconcile get the pod whose sidecar containers should be stopped, and stop them.
Expand Down Expand Up @@ -129,8 +138,8 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
return reconcile.Result{}, nil
}

if containersCompleted(pod, getSidecar(pod)) {
klog.V(3).Infof("SidecarTerminator -- all sidecars of pod(%v/%v) have been completed, no need to process", pod.Namespace, pod.Name)
if containersSucceeded(pod, getSidecar(pod)) {
klog.V(3).Infof("SidecarTerminator -- all sidecars of pod(%v/%v) have been succeeded, no need to process", pod.Namespace, pod.Name)
return reconcile.Result{}, nil
}

Expand All @@ -139,7 +148,8 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
return reconcile.Result{}, nil
}

sidecarNeedToExecuteKillContainer, sidecarNeedToExecuteInPlaceUpdate, err := r.groupSidecars(pod)
sidecarNeedToExecuteKillContainer, sidecarNeedToExecuteInPlaceUpdate, sidecarNeedToSyncStatus, err := r.groupSidecars(pod)

if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -152,23 +162,134 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
return reconcile.Result{}, err
}

if sidecarNeedToSyncStatus.Len() > 0 {
if err := r.terminateJobPod(pod, sidecarNeedToSyncStatus); err != nil {
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
}

func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String, sets.String, error) {
// terminateJobPod terminate the job pod and skip the state of the sidecar containers
// This method should only be called after the executeKillContainerAction is called
func (r *ReconcileSidecarTerminator) terminateJobPod(pod *corev1.Pod, sidecars sets.String) error {
// skip sync status of sidecar container if job has completed.
// the real status that reported by kubelet will be store into the state of sidecar container.
// the pod is repeatedly processed by job controller until the job reaches completed phase. because kubelet and the logic bellow will report different status of the sidecar container.
if deduceWhetherTheJobIsCompletedFromThePod(pod) {
klog.V(3).Infof("SidecarTerminator -- we can deduce whether the job is completed from the main container status of the pod(%v/%v) and pod phase,no need to process", pod.Namespace, pod.Name)
return nil
}

var changed bool
newSidecarStatus := make(map[string]corev1.ContainerStatus)
for i := range pod.Spec.Containers {
status := &pod.Status.ContainerStatuses[i]
if !sidecars.Has(status.Name) {
continue
}

changed = true

// The kubelet may update the state of the containers and the phase of the pod before the sidecar terminator controller
if status.State.Terminated != nil && status.State.Terminated.ExitCode != int32(0) {
klog.V(3).Infof("SidecarTerminator -- ignore the non-zero exit code of the sidecar container %s/%s, the pod phase will be updated", pod.Name, status.Name)
newSidecarStatus[status.Name] = *status
} else if status.State.Terminated == nil && status.State.Running != nil {
klog.V(3).Infof("SidecarTerminator -- sync the status of the sidecar container %s/%s and update the pod phase, all of the main containers are completed", pod.Name, status.Name)
newStatus := *status.DeepCopy()
newStatus.Ready = false
newStatus.Started = &newStatus.Ready
newStatus.State = corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: int32(0),
Reason: "Completed",
StartedAt: status.State.Running.StartedAt,
FinishedAt: metav1.NewTime(r.clock.Now()),
ContainerID: status.ContainerID,
},
}
newSidecarStatus[status.Name] = newStatus
}

}
var err error
if changed {
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
latestPod := &corev1.Pod{}
if err = r.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, latestPod); err != nil {
return err
}

// maybe some sidecar containers are pending
if getSidecar(latestPod).Len() != len(newSidecarStatus) {
return nil
}

for i := range latestPod.Spec.Containers {
for name, status := range newSidecarStatus {
if latestPod.Status.ContainerStatuses[i].Name == name {
latestPod.Status.ContainerStatuses[i] = status
}
}
}

// terminate the pod, ignore states of the sidecar container.
// the pod phase will be not changed after updated by sidecar terminator controller since pods are not allowed to transition out of terminal phases when kubelet report the pod status.
if containersSucceeded(latestPod, getMain(latestPod)) {
latestPod.Status.Phase = corev1.PodSucceeded
for i, condition := range latestPod.Status.Conditions {
if condition.Type == corev1.PodReady || condition.Type == corev1.ContainersReady {
latestPod.Status.Conditions[i].Reason = "PodCompleted"
latestPod.Status.Conditions[i].Status = corev1.ConditionTrue
}
}
} else {
latestPod.Status.Phase = corev1.PodFailed
for i, condition := range latestPod.Status.Conditions {
if condition.Type == corev1.PodReady || condition.Type == corev1.ContainersReady {
latestPod.Status.Conditions[i].Reason = "PodFailed"
latestPod.Status.Conditions[i].Status = corev1.ConditionFalse
}
}
}

// condition
condition := getSidecarTerminatedCondition(latestPod, sidecarTerminated)
if condition == nil {
latestPod.Status.Conditions = append(latestPod.Status.Conditions, corev1.PodCondition{
Type: sidecarTerminated,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
})
} else {
condition.LastTransitionTime = metav1.Now()
}

return r.Status().Update(context.TODO(), latestPod)
})
}

return err
}

func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String, sets.String, sets.String, error) {
runningOnVK, err := IsPodRunningOnVirtualKubelet(pod, r.Client)
if err != nil {
return nil, nil, client.IgnoreNotFound(err)
return nil, nil, nil, client.IgnoreNotFound(err)
}

inPlaceUpdate := sets.NewString()
killContainer := sets.NewString()
syncStatusContainer := sets.NewString()
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
for j := range container.Env {
if !runningOnVK && container.Env[j].Name == appsv1alpha1.KruiseTerminateSidecarEnv &&
strings.EqualFold(container.Env[j].Value, "true") {
killContainer.Insert(container.Name)
syncStatusContainer.Insert(container.Name)
break
}
if container.Env[j].Name == appsv1alpha1.KruiseTerminateSidecarWithImageEnv &&
Expand All @@ -177,7 +298,7 @@ func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String
}
}
}
return killContainer, inPlaceUpdate, nil
return killContainer, inPlaceUpdate, syncStatusContainer, nil
}

func containersCompleted(pod *corev1.Pod, containers sets.String) bool {
Expand Down Expand Up @@ -208,3 +329,26 @@ func containersSucceeded(pod *corev1.Pod, containers sets.String) bool {
}
return true
}

func deduceWhetherTheJobIsCompletedFromThePod(pod *corev1.Pod) bool {
mainContainers := getMain(pod)
if containersCompleted(pod, mainContainers) && containersSucceeded(pod, mainContainers) {
return pod.Status.Phase == corev1.PodSucceeded
}
if containersCompleted(pod, mainContainers) && !containersSucceeded(pod, mainContainers) {
return pod.Status.Phase == corev1.PodFailed
}
return false
}
func getSidecarTerminatedCondition(pod *corev1.Pod, condType corev1.PodConditionType) *corev1.PodCondition {
if pod == nil {
return nil
}
for i := range pod.Status.Conditions {
c := &pod.Status.Conditions[i]
if c.Type == condType {
return c
}
}
return nil
}
Loading

0 comments on commit a1e7bd6

Please sign in to comment.