diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 50607d8597..d500f30515 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -48,6 +48,7 @@ jobs: version: ${{ env.GOLANGCI_VERSION }} args: --verbose skip-pkg-cache: true + mod: readonly markdownlint-misspell-shellcheck: runs-on: ubuntu-18.04 diff --git a/.github/workflows/e2e-1.16.yaml b/.github/workflows/e2e-1.16.yaml index aaed35ec6b..240ce805c9 100644 --- a/.github/workflows/e2e-1.16.yaml +++ b/.github/workflows/e2e-1.16.yaml @@ -10,7 +10,7 @@ on: env: # Common versions - GO_VERSION: '1.17' + GO_VERSION: '1.18' KIND_VERSION: 'v0.14.0' KIND_IMAGE: 'kindest/node:v1.16.15' KIND_CLUSTER_NAME: 'ci-testing' @@ -85,6 +85,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal pullimages-containerrecreate: @@ -154,6 +166,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal advanced-daemonset: @@ -223,6 +247,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal sidecarset: @@ -292,6 +328,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal podUnavailableBudget: @@ -419,4 +467,16 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal diff --git a/.github/workflows/e2e-1.24.yaml b/.github/workflows/e2e-1.24.yaml index dda59a8615..2bf1c24348 100644 --- a/.github/workflows/e2e-1.24.yaml +++ b/.github/workflows/e2e-1.24.yaml @@ -154,6 +154,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal advanced-daemonset: @@ -223,6 +235,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal sidecarset: @@ -292,6 +316,18 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal ephemeraljob: @@ -477,4 +513,16 @@ jobs: kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system exit 1 fi + kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name; + do + restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-daemon has not restarted" + else + kubectl get pods -n ${ns} -l control-plane=daemon --no-headers + echo "Kruise-daemon has restarted, abort!!!" + kubectl logs -p -n ${ns} ${name} + exit 1 + fi + done exit $retVal diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d12c9b6d31..3640e54e9b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -88,7 +88,7 @@ We encourage contributors to follow the [PR template](./.github/PULL_REQUEST_TEM As a contributor, if you want to make any contribution to Kruise project, we should reach an agreement on the version of tools used in the development environment. Here are some dependents with specific version: -- Golang : v1.17+ +- Golang : v1.18+ - Kubernetes: v1.16+ ### Developing guide diff --git a/Dockerfile b/Dockerfile index 9fcd11c772..6feb06d44e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager and daemon binaries -FROM golang:1.17 as builder +FROM golang:1.18 as builder WORKDIR /workspace # Copy the Go Modules manifests diff --git a/Dockerfile_multiarch b/Dockerfile_multiarch index 353e66620c..1c280f5e19 100644 --- a/Dockerfile_multiarch +++ b/Dockerfile_multiarch @@ -1,5 +1,5 @@ # Build the manager and daemon binaries -FROM --platform=$BUILDPLATFORM golang:1.17 as builder +FROM --platform=$BUILDPLATFORM golang:1.18 as builder WORKDIR /workspace # Copy the Go Modules manifests diff --git a/Makefile b/Makefile index fd3af92d03..c25f07e6b3 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ all: build ##@ Development go_check: - @scripts/check_go_version "1.17.0" + @scripts/check_go_version "1.18.0" generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. @scripts/generate_client.sh diff --git a/config/rbac/daemon_role.yaml b/config/rbac/daemon_role.yaml index a1b5a732f0..e87ce2a1b7 100644 --- a/config/rbac/daemon_role.yaml +++ b/config/rbac/daemon_role.yaml @@ -72,3 +72,19 @@ rules: - get - list - watch +- apiGroups: + - apps.kruise.io + resources: + - nodepodprobes + verbs: + - get + - list + - watch +- apiGroups: + - apps.kruise.io + resources: + - nodepodprobes/status + verbs: + - get + - patch + - update diff --git a/go.mod b/go.mod index b058b229de..36d0b0c6f6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/openkruise/kruise -go 1.17 +go 1.18 require ( github.com/alibaba/pouch v0.0.0-20190328125340-37051654f368 diff --git a/pkg/controller/nodepodprobe/node_pod_probe_controller.go b/pkg/controller/nodepodprobe/node_pod_probe_controller.go index 71302ad957..eb8d2354e1 100644 --- a/pkg/controller/nodepodprobe/node_pod_probe_controller.go +++ b/pkg/controller/nodepodprobe/node_pod_probe_controller.go @@ -23,18 +23,19 @@ import ( "strings" "time" - "k8s.io/apimachinery/pkg/util/sets" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/controllerfinder" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/openkruise/kruise/pkg/util/ratelimiter" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" kubecontroller "k8s.io/kubernetes/pkg/controller" @@ -67,7 +68,7 @@ var ( // Add creates a new NodePodProbe Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - if !utildiscovery.DiscoverGVK(controllerKind) { + if !utildiscovery.DiscoverGVK(controllerKind) || !utilfeature.DefaultFeatureGate.Enabled(features.PodProbeMarkerGate) { return nil } return add(mgr, newReconciler(mgr)) @@ -220,9 +221,10 @@ func (r *ReconcileNodePodProbe) syncPodFromNodePodProbe(npp *appsv1alpha1.NodePo func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status appsv1alpha1.PodProbeStatus) error { // map[probe.name]->probeState - currentConditions := make(map[string]appsv1alpha1.ProbeState) - for _, condition := range pod.Status.Conditions { - currentConditions[string(condition.Type)] = appsv1alpha1.ProbeState(condition.Status) + currentConditions := make(map[string]*corev1.PodCondition) + for i := range pod.Status.Conditions { + condition := &pod.Status.Conditions[i] + currentConditions[string(condition.Type)] = condition } type metadata struct { Labels map[string]interface{} `json:"labels,omitempty"` @@ -239,11 +241,9 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app validConditionTypes := sets.NewString() for i := range status.ProbeStates { probeState := status.ProbeStates[i] - // ignore the probe state - if probeState.State == "" || probeState.State == currentConditions[probeState.Name] { + if probeState.State == "" { continue } - // fetch podProbeMarker ppmName, probeName := strings.Split(probeState.Name, "#")[0], strings.Split(probeState.Name, "#")[1] ppm := &appsv1alpha1.PodProbeMarker{} @@ -267,7 +267,6 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app break } } - if conditionType != "" && validConditionTypes.Has(conditionType) { klog.Warningf("NodePodProbe(%s) pod(%s/%s) condition(%s) is conflict", ppmName, pod.Namespace, pod.Name, conditionType) // patch pod condition @@ -287,11 +286,9 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app Message: probeState.Message, }) } - if len(policy) == 0 { continue } - // matchedPolicy is when policy.state is equal to probeState.State, otherwise oppositePolicy // 1. If policy[0].state = Succeeded, policy[1].state = Failed. probeState.State = Succeeded. // So policy[0] is matchedPolicy, policy[1] is oppositePolicy @@ -328,7 +325,6 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app if len(probeConditions) == 0 && len(probeMetadata.Labels) == 0 && len(probeMetadata.Annotations) == 0 { return nil } - //update pod metadata and status condition podClone := pod.DeepCopy() if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -336,7 +332,7 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app klog.Errorf("error getting updated pod(%s/%s) from client", pod.Namespace, pod.Name) return err } - oldStatus := podClone.DeepCopy() + oldStatus := podClone.Status.DeepCopy() for i := range probeConditions { condition := probeConditions[i] util.SetPodCondition(podClone, condition) @@ -363,7 +359,7 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app podClone.Annotations[k] = v.(string) } } - if reflect.DeepEqual(oldStatus, podClone.Status) && reflect.DeepEqual(oldMetadata.Labels, podClone.Labels) && + if reflect.DeepEqual(oldStatus.Conditions, podClone.Status.Conditions) && reflect.DeepEqual(oldMetadata.Labels, podClone.Labels) && reflect.DeepEqual(oldMetadata.Annotations, podClone.Annotations) { return nil } @@ -372,6 +368,7 @@ func (r *ReconcileNodePodProbe) updatePodProbeStatus(pod *corev1.Pod, status app klog.Errorf("NodePodProbe patch pod(%s/%s) status failed: %s", podClone.Namespace, podClone.Name, err.Error()) return err } - klog.V(3).Infof("NodePodProbe update pod(%s/%s) status conditions(%s) success", podClone.Namespace, podClone.Name, util.DumpJSON(probeConditions)) + klog.V(3).Infof("NodePodProbe update pod(%s/%s) metadata(%s) conditions(%s) success", podClone.Namespace, podClone.Name, + util.DumpJSON(probeMetadata), util.DumpJSON(probeConditions)) return nil } diff --git a/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go b/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go index 8e30e56939..0f2bb4c4e8 100644 --- a/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go +++ b/pkg/controller/nodepodprobe/nodepodprobe_event_handler.go @@ -24,7 +24,6 @@ import ( appsalphav1 "github.com/openkruise/kruise/apis/apps/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -64,34 +63,9 @@ func (p *enqueueRequestForNodePodProbe) Update(evt event.UpdateEvent, q workqueu if !ok { return } - if checkNodePodProbeStatusEqual(new.Status.DeepCopy(), old.Status.DeepCopy()) { - return - } - p.queue(q, new) -} - -func checkNodePodProbeStatusEqual(obj1, obj2 *appsalphav1.NodePodProbeStatus) bool { - // ignore LastProbeTime, LastTransitionTime and Message - t := metav1.Now() - for i := range obj1.PodProbeStatuses { - podProbe := &obj1.PodProbeStatuses[i] - for j := range podProbe.ProbeStates { - state := &podProbe.ProbeStates[j] - state.LastProbeTime = t - state.LastTransitionTime = t - state.Message = "" - } - } - for i := range obj2.PodProbeStatuses { - podProbe := &obj2.PodProbeStatuses[i] - for j := range podProbe.ProbeStates { - state := &podProbe.ProbeStates[j] - state.LastProbeTime = t - state.LastTransitionTime = t - state.Message = "" - } + if !reflect.DeepEqual(new.Status, old.Status) { + p.queue(q, new) } - return reflect.DeepEqual(obj1, obj2) } func (p *enqueueRequestForNodePodProbe) queue(q workqueue.RateLimitingInterface, npp *appsalphav1.NodePodProbe) { diff --git a/pkg/controller/podprobemarker/pod_probe_marker_controller.go b/pkg/controller/podprobemarker/pod_probe_marker_controller.go index f3e9d27673..13d49e9856 100644 --- a/pkg/controller/podprobemarker/pod_probe_marker_controller.go +++ b/pkg/controller/podprobemarker/pod_probe_marker_controller.go @@ -24,10 +24,12 @@ import ( "strings" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/controllerfinder" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/openkruise/kruise/pkg/util/ratelimiter" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -66,7 +68,7 @@ const ( // Add creates a new PodProbeMarker Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - if !utildiscovery.DiscoverGVK(controllerKind) { + if !utildiscovery.DiscoverGVK(controllerKind) || !utilfeature.DefaultFeatureGate.Enabled(features.PodProbeMarkerGate) { return nil } return add(mgr, newReconciler(mgr)) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index a7c4db69d4..67b6bfcdbb 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -23,6 +23,17 @@ import ( "net/http" "sync" + kruiseapis "github.com/openkruise/kruise/apis" + "github.com/openkruise/kruise/pkg/client" + "github.com/openkruise/kruise/pkg/daemon/containermeta" + "github.com/openkruise/kruise/pkg/daemon/containerrecreate" + daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" + "github.com/openkruise/kruise/pkg/daemon/imagepuller" + daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" + "github.com/openkruise/kruise/pkg/daemon/podprobe" + daemonutil "github.com/openkruise/kruise/pkg/daemon/util" + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,17 +46,6 @@ import ( "k8s.io/klog/v2" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/metrics" - - kruiseapis "github.com/openkruise/kruise/apis" - "github.com/openkruise/kruise/pkg/client" - "github.com/openkruise/kruise/pkg/daemon/containermeta" - "github.com/openkruise/kruise/pkg/daemon/containerrecreate" - daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" - "github.com/openkruise/kruise/pkg/daemon/imagepuller" - daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" - daemonutil "github.com/openkruise/kruise/pkg/daemon/util" - "github.com/openkruise/kruise/pkg/features" - utilfeature "github.com/openkruise/kruise/pkg/util/feature" ) const ( @@ -152,6 +152,15 @@ func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) { crrController, } + // node pod probe + if utilfeature.DefaultFeatureGate.Enabled(features.PodProbeMarkerGate) { + nppController, err := podprobe.NewController(opts) + if err != nil { + return nil, fmt.Errorf("failed to new nodePodProbe daemon controller: %v", err) + } + runnables = append(runnables, nppController) + } + if utilfeature.DefaultFeatureGate.Enabled(features.DaemonWatchingPod) { containerMetaController, err := containermeta.NewController(opts) if err != nil { diff --git a/pkg/daemon/podprobe/pod_probe_controller.go b/pkg/daemon/podprobe/pod_probe_controller.go new file mode 100644 index 0000000000..064a450b79 --- /dev/null +++ b/pkg/daemon/podprobe/pod_probe_controller.go @@ -0,0 +1,441 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "context" + "crypto/rand" + "fmt" + "math/big" + "net/http" + "reflect" + "sync" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client" + kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" + clientalpha1 "github.com/openkruise/kruise/pkg/client/clientset/versioned/typed/apps/v1alpha1" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" + daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" + "github.com/openkruise/kruise/pkg/daemon/util" + commonutil "github.com/openkruise/kruise/pkg/util" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/gengo/examples/set-gen/sets" + "k8s.io/klog/v2" + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" +) + +const ( + EventKruiseProbeSucceeded = "KruiseProbeSucceeded" + EventKruiseProbeFailed = "KruiseProbeFailed" +) + +// Key uniquely identifying container probes +type probeKey struct { + podNs string + podName string + podUID string + containerName string + probeName string +} + +type Controller struct { + queue workqueue.RateLimitingInterface + updateQueue workqueue.RateLimitingInterface + nodePodProbeInformer cache.SharedIndexInformer + nodePodProbeLister listersalpha1.NodePodProbeLister + nodePodProbeClient clientalpha1.NodePodProbeInterface + // event + eventRecorder record.EventRecorder + // Map of active workers for probes + workers map[probeKey]*worker + // Lock for accessing & mutating workers + workerLock sync.RWMutex + runtimeFactory daemonruntime.Factory + // prober executes the probe actions. + prober *prober + // pod probe result manager + result *resultManager + // node name + nodeName string + // kruise daemon start time + start time.Time +} + +// NewController returns the controller for pod probe +func NewController(opts daemonoptions.Options) (*Controller, error) { + // pull the next work item from queue. It should be a key we use to lookup + // something in a cache + nodeName, err := util.NodeName() + if err != nil { + return nil, err + } + randInt, _ := rand.Int(rand.Reader, big.NewInt(5000)) + queue := workqueue.NewNamedRateLimitingQueue( + // Backoff duration from 500ms to 50~55s + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(randInt.Int64())), + "sync_node_pod_probe", + ) + updateQueue := workqueue.NewNamedRateLimitingQueue( + // Backoff duration from 500ms to 50~55s + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(randInt.Int64())), + "update_node_pod_probe_status", + ) + genericClient := client.GetGenericClientWithName("kruise-daemon-podprobe") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: genericClient.KubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(opts.Scheme, corev1.EventSource{Component: "kruise-daemon-podprobe", Host: opts.NodeName}) + informer := newNodePodProbeInformer(genericClient.KruiseClient, opts.NodeName) + c := &Controller{ + nodePodProbeInformer: informer, + nodePodProbeLister: listersalpha1.NewNodePodProbeLister(informer.GetIndexer()), + runtimeFactory: opts.RuntimeFactory, + workers: make(map[probeKey]*worker), + queue: queue, + updateQueue: updateQueue, + nodePodProbeClient: genericClient.KruiseClient.AppsV1alpha1().NodePodProbes(), + result: newResultManager(updateQueue), + nodeName: nodeName, + eventRecorder: recorder, + start: time.Now(), + } + c.prober = newProber(c.runtimeFactory.GetRuntimeService()) + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + npp, ok := obj.(*appsv1alpha1.NodePodProbe) + if ok { + enqueue(queue, npp) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldNodePodProbe, oldOK := oldObj.(*appsv1alpha1.NodePodProbe) + newNodePodProbe, newOK := newObj.(*appsv1alpha1.NodePodProbe) + if !oldOK || !newOK { + return + } + if reflect.DeepEqual(oldNodePodProbe.Spec, newNodePodProbe.Spec) { + return + } + enqueue(queue, newNodePodProbe) + }, + }) + + opts.Healthz.RegisterFunc("nodePodProbeInformerSynced", func(_ *http.Request) error { + if !informer.HasSynced() { + return fmt.Errorf("not synced") + } + return nil + }) + return c, nil +} + +func newNodePodProbeInformer(client kruiseclient.Interface, nodeName string) cache.SharedIndexInformer { + tweakListOptionsFunc := func(opt *metav1.ListOptions) { + opt.FieldSelector = "metadata.name=" + nodeName + } + + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + tweakListOptionsFunc(&options) + return client.AppsV1alpha1().NodePodProbes().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + tweakListOptionsFunc(&options) + return client.AppsV1alpha1().NodePodProbes().Watch(context.TODO(), options) + }, + }, + &appsv1alpha1.NodePodProbe{}, + 0, // do not resync + cache.Indexers{}, + ) +} + +func enqueue(queue workqueue.Interface, obj *appsv1alpha1.NodePodProbe) { + if obj.DeletionTimestamp != nil { + return + } + key, _ := cache.MetaNamespaceKeyFunc(obj) + queue.Add(key) +} + +func (c *Controller) Run(stop <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Info("Starting informer for NodePodProbe") + go c.nodePodProbeInformer.Run(stop) + if !cache.WaitForCacheSync(stop, c.nodePodProbeInformer.HasSynced) { + return + } + + klog.Infof("Starting NodePodProbe controller") + // Launch a worker to process resources, for there is only one nodePodProbe per Node + go wait.Until(func() { + for c.processNextWorkItem() { + } + }, time.Second, stop) + + go wait.Until(func() { + for c.processUpdateWorkItem() { + } + }, time.Second, stop) + + klog.Info("Started NodePodProbe controller successfully") + <-stop +} + +// run probe worker based on NodePodProbe.Spec configuration +func (c *Controller) processNextWorkItem() bool { + // pull the next work item from queue. It should be a key we use to lookup + // something in a cache + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.sync() + if err == nil { + // No error, tell the queue to stop tracking history + c.queue.Forget(key) + } else { + // requeue the item to work on later + c.queue.AddRateLimited(key) + } + + return true +} + +func (c *Controller) sync() error { + // indicates must be deep copy before update pod objection + npp, err := c.nodePodProbeLister.Get(c.nodeName) + if errors.IsNotFound(err) { + return nil + } else if err != nil { + klog.Errorf("Failed to get nodePodProbe %s: %v", c.nodeName, err) + return err + } + + // run probe worker + c.workerLock.Lock() + validWorkers := map[probeKey]struct{}{} + for _, podProbe := range npp.Spec.PodProbes { + key := probeKey{podNs: podProbe.Namespace, podName: podProbe.Name, podUID: podProbe.UID} + for i := range podProbe.Probes { + probe := podProbe.Probes[i] + key.containerName = probe.ContainerName + key.probeName = probe.Name + validWorkers[key] = struct{}{} + if worker, ok := c.workers[key]; ok { + if !reflect.DeepEqual(probe.Probe, worker.getProbeSpec()) { + klog.Infof("NodePodProbe pod(%s) container(%s) probe changed from(%s) -> to(%s)", + key.podUID, key.containerName, commonutil.DumpJSON(worker.getProbeSpec()), commonutil.DumpJSON(probe.Probe)) + worker.updateProbeSpec(&probe.Probe) + } + continue + } + w := newWorker(c, key, &probe.Probe) + c.workers[key] = w + klog.Infof("NodePodProbe run pod(%s) container(%s) probe(%s) spec(%s) worker", key.podUID, key.containerName, key.probeName, commonutil.DumpJSON(probe.Probe)) + go w.run() + } + } + for key, worker := range c.workers { + if _, ok := validWorkers[key]; !ok { + klog.Infof("NodePodProbe stop pod(%s/%s) container(%s) probe(%s) worker", key.podNs, key.podName, key.containerName, key.probeName) + worker.stop() + } + } + c.workerLock.Unlock() + c.updateQueue.Add("updateStatus") + return nil +} + +// Record the execution result of probe worker to NodePodProbe Status +func (c *Controller) processUpdateWorkItem() bool { + key, quit := c.updateQueue.Get() + if quit { + return false + } + defer c.updateQueue.Done(key) + + err := c.syncUpdateNodePodProbeStatus() + if err == nil { + // No error, tell the queue to stop tracking history + c.queue.Forget(key) + } else { + // requeue the item to work on later + c.queue.AddRateLimited(key) + } + + return true +} + +func (c *Controller) syncUpdateNodePodProbeStatus() error { + // indicates must be deep copy before update pod objection + npp, err := c.nodePodProbeLister.Get(c.nodeName) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + klog.Errorf("Get NodePodProbe(%s) failed: %s", c.nodeName, err.Error()) + return err + } + validSets := sets.NewString() + for _, podProbe := range npp.Spec.PodProbes { + for _, probe := range podProbe.Probes { + validSets.Insert(fmt.Sprintf("%s/%s", podProbe.UID, probe.Name)) + } + } + // If the PodProbe is deleted, the corresponding status will be clear + newStatus := &appsv1alpha1.NodePodProbeStatus{} + for _, probeStatus := range npp.Status.PodProbeStatuses { + newProbeStatus := appsv1alpha1.PodProbeStatus{ + Namespace: probeStatus.Namespace, + Name: probeStatus.Name, + UID: probeStatus.UID, + } + for i := range probeStatus.ProbeStates { + probeState := probeStatus.ProbeStates[i] + if validSets.Has(fmt.Sprintf("%s/%s", probeStatus.UID, probeState.Name)) { + newProbeStatus.ProbeStates = append(newProbeStatus.ProbeStates, probeState) + } + } + if len(newProbeStatus.ProbeStates) > 0 { + newStatus.PodProbeStatuses = append(newStatus.PodProbeStatuses, newProbeStatus) + } + } + // update nodePodProbe.Status according to probe worker + updates := c.result.listResults() + for _, update := range updates { + if !validSets.Has(fmt.Sprintf("%s/%s", update.Key.podUID, update.Key.probeName)) { + continue + } + //record probe result in pod event + ref := &corev1.ObjectReference{Kind: "Pod", Namespace: update.Key.podNs, Name: update.Key.podName, UID: types.UID(update.Key.podUID), + APIVersion: corev1.SchemeGroupVersion.String()} + if update.State == appsv1alpha1.ProbeSucceeded { + c.eventRecorder.Event(ref, corev1.EventTypeNormal, EventKruiseProbeSucceeded, update.Msg) + } else { + c.eventRecorder.Event(ref, corev1.EventTypeNormal, EventKruiseProbeFailed, update.Msg) + } + // update probe result in status + updateNodePodProbeStatus(update, newStatus) + } + if reflect.DeepEqual(npp.Status, newStatus) { + return nil + } + nppClone := npp.DeepCopy() + nppClone.Status = *newStatus + _, err = c.nodePodProbeClient.UpdateStatus(context.TODO(), nppClone, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("NodePodProbe(%s) update status failed: %s", c.nodeName, err.Error()) + return err + } + klog.Infof("NodePodProbe(%s) update status from(%s) -> to(%s) success", c.nodeName, commonutil.DumpJSON(npp.Status), commonutil.DumpJSON(nppClone.Status)) + return nil +} + +// Called by the worker after exiting. +func (c *Controller) removeWorker(key probeKey) { + c.workerLock.Lock() + defer c.workerLock.Unlock() + delete(c.workers, key) +} + +func (c *Controller) fetchLatestPodContainer(podUID, name string) (*runtimeapi.ContainerStatus, error) { + // runtimeService, for example docker + if c.runtimeFactory == nil { + klog.Warningf("NodePodProbe not found runtimeFactory") + return nil, nil + } + runtimeService := c.runtimeFactory.GetRuntimeService() + if runtimeService == nil { + klog.Warningf("NodePodProbe not found runtimeService") + return nil, nil + } + containers, err := runtimeService.ListContainers(&runtimeapi.ContainerFilter{ + LabelSelector: map[string]string{kubelettypes.KubernetesPodUIDLabel: podUID}, + }) + if err != nil { + klog.Errorf("NodePodProbe pod(%s) list containers failed: %s", podUID, err.Error()) + return nil, err + } + var container *runtimeapi.Container + for i := range containers { + obj := containers[i] + if obj.Metadata.Name != name { + continue + } + if container == nil || obj.CreatedAt > container.CreatedAt { + container = obj + } + } + var containerStatus *runtimeapi.ContainerStatus + if container != nil { + containerStatus, err = runtimeService.ContainerStatus(container.Id) + } + return containerStatus, err +} + +func updateNodePodProbeStatus(update Update, newStatus *appsv1alpha1.NodePodProbeStatus) { + var probeStatus *appsv1alpha1.PodProbeStatus + for i := range newStatus.PodProbeStatuses { + status := &newStatus.PodProbeStatuses[i] + if status.UID == update.Key.podUID { + probeStatus = status + break + } + } + if probeStatus == nil { + newStatus.PodProbeStatuses = append(newStatus.PodProbeStatuses, appsv1alpha1.PodProbeStatus{Namespace: update.Key.podNs, Name: update.Key.podName, UID: update.Key.podUID}) + probeStatus = &newStatus.PodProbeStatuses[len(newStatus.PodProbeStatuses)-1] + } + for i, obj := range probeStatus.ProbeStates { + if obj.Name == update.Key.probeName { + probeStatus.ProbeStates[i].State = update.State + probeStatus.ProbeStates[i].Message = update.Msg + probeStatus.ProbeStates[i].LastProbeTime = update.LastProbeTime + if obj.State != update.State { + probeStatus.ProbeStates[i].LastTransitionTime = metav1.Now() + } + return + } + } + probeStatus.ProbeStates = append(probeStatus.ProbeStates, appsv1alpha1.ContainerProbeState{ + Name: update.Key.probeName, + State: update.State, + LastProbeTime: update.LastProbeTime, + LastTransitionTime: metav1.Now(), + Message: update.Msg, + }) +} diff --git a/pkg/daemon/podprobe/pod_probe_controller_test.go b/pkg/daemon/podprobe/pod_probe_controller_test.go new file mode 100644 index 0000000000..9b1b91fe7c --- /dev/null +++ b/pkg/daemon/podprobe/pod_probe_controller_test.go @@ -0,0 +1,564 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "reflect" + "sync" + "testing" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + commonutil "github.com/openkruise/kruise/pkg/util" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +var ( + demoNodePodProbe = appsv1alpha1.NodePodProbe{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Spec: appsv1alpha1.NodePodProbeSpec{ + PodProbes: []appsv1alpha1.PodProbe{ + { + Name: "pod-0", + UID: "pod-0-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + Probes: []appsv1alpha1.ContainerProbe{ + { + Name: "ppm-1#healthy", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }, + }, + }, + }, + }, + } +) + +func TestUpdateNodePodProbeStatus(t *testing.T) { + cases := []struct { + name string + getUpdate func() Update + getNodePodProbe func() *appsv1alpha1.NodePodProbe + expectNodePodProbeStatus func() appsv1alpha1.NodePodProbeStatus + }{ + { + name: "test1, update pod probe status", + getUpdate: func() Update { + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + obj := appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return obj + }, + }, + { + name: "test2, update pod probe status", + getUpdate: func() Update { + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[1].Probes = append(demo.Spec.PodProbes[1].Probes, appsv1alpha1.ContainerProbe{ + Name: "ppm-1#other", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/other.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }) + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + obj := appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-0", + UID: "pod-0-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return obj + }, + }, + { + name: "test3, update pod probe status", + getUpdate: func() Update { + return Update{Key: probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"}, State: appsv1alpha1.ProbeSucceeded} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[1].Probes = append(demo.Spec.PodProbes[1].Probes, appsv1alpha1.ContainerProbe{ + Name: "ppm-1#other", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/other.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }) + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#other", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + obj := appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeSucceeded, + }, + }, + }, + }, + } + return obj + }, + }, + { + name: "test4, update pod probe status", + getUpdate: func() Update { + return Update{} + }, + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Status = appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-2#other", + State: appsv1alpha1.ProbeFailed, + }, + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + { + Name: "pod-2", + UID: "pod-2-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-2#other", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + return demo + }, + expectNodePodProbeStatus: func() appsv1alpha1.NodePodProbeStatus { + return appsv1alpha1.NodePodProbeStatus{ + PodProbeStatuses: []appsv1alpha1.PodProbeStatus{ + { + Name: "pod-1", + UID: "pod-1-uid", + ProbeStates: []appsv1alpha1.ContainerProbeState{ + { + Name: "ppm-1#healthy", + State: appsv1alpha1.ProbeFailed, + }, + }, + }, + }, + } + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(cs.getNodePodProbe()) + informer := newNodePodProbeInformer(fakeClient, "node-1") + fakeRecorder := record.NewFakeRecorder(100) + c := &Controller{ + nodePodProbeInformer: informer, + nodePodProbeLister: listersalpha1.NewNodePodProbeLister(informer.GetIndexer()), + workers: make(map[probeKey]*worker), + nodePodProbeClient: fakeClient.AppsV1alpha1().NodePodProbes(), + nodeName: "node-1", + result: newResultManager(workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second), + "sync_node_pod_probe", + )), + eventRecorder: fakeRecorder, + } + stopCh := make(chan struct{}, 1) + go c.nodePodProbeInformer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.nodePodProbeInformer.HasSynced) { + return + } + c.result.cache = &sync.Map{} + c.result.cache.Store("container-id-1", cs.getUpdate()) + err := c.syncUpdateNodePodProbeStatus() + if err != nil { + t.Fatalf("syncUpdateNodePodProbeStatus failed: %s", err.Error()) + return + } + time.Sleep(time.Second) + if !checkNodePodProbeStatusEqual(c.nodePodProbeLister, cs.expectNodePodProbeStatus()) { + t.Fatalf("checkNodePodProbeStatusEqual failed") + } + }) + } +} + +func checkNodePodProbeStatusEqual(lister listersalpha1.NodePodProbeLister, expect appsv1alpha1.NodePodProbeStatus) bool { + npp, err := lister.Get("node-1") + if err != nil { + klog.Errorf("Get NodePodProbe failed: %s", err.Error()) + return false + } + for i := range npp.Status.PodProbeStatuses { + podProbe := npp.Status.PodProbeStatuses[i] + for j := range podProbe.ProbeStates { + obj := &podProbe.ProbeStates[j] + obj.LastTransitionTime = metav1.Time{} + obj.LastProbeTime = metav1.Time{} + } + } + return reflect.DeepEqual(npp.Status.PodProbeStatuses, expect.PodProbeStatuses) +} + +func TestSyncNodePodProbe(t *testing.T) { + cases := []struct { + name string + getNodePodProbe func() *appsv1alpha1.NodePodProbe + setWorkers func(c *Controller) + expectWorkers func(c *Controller) map[probeKey]*worker + }{ + { + name: "test1, sync nodePodProbe", + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes = demo.Spec.PodProbes[1:] + return demo + }, + setWorkers: func(c *Controller) { + c.workers = map[probeKey]*worker{} + key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#check"} + c.workers[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/check.sh"}, + }, + }, + }, + }) + go c.workers[key1].run() + key2 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + c.workers[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy2.sh"}, + }, + }, + }, + }) + go c.workers[key2].run() + }, + expectWorkers: func(c *Controller) map[probeKey]*worker { + expect := map[probeKey]*worker{} + key := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + expect[key] = newWorker(c, key, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + return expect + }, + }, + { + name: "test2, sync nodePodProbe", + getNodePodProbe: func() *appsv1alpha1.NodePodProbe { + demo := demoNodePodProbe.DeepCopy() + demo.Spec.PodProbes[1].Probes = append(demo.Spec.PodProbes[0].Probes, appsv1alpha1.ContainerProbe{ + Name: "ppm-1#check", + ContainerName: "nginx", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/check.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }, + }) + return demo + }, + setWorkers: func(c *Controller) { + c.workers = map[probeKey]*worker{} + }, + expectWorkers: func(c *Controller) map[probeKey]*worker { + expect := map[probeKey]*worker{} + key1 := probeKey{"", "pod-1", "pod-1-uid", "main", "ppm-1#healthy"} + expect[key1] = newWorker(c, key1, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + key2 := probeKey{"", "pod-1", "pod-1-uid", "nginx", "ppm-1#check"} + expect[key2] = newWorker(c, key2, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/check.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + key3 := probeKey{"", "pod-0", "pod-0-uid", "main", "ppm-1#healthy"} + expect[key3] = newWorker(c, key3, &appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "/healthy.sh"}, + }, + }, + InitialDelaySeconds: 100, + }, + }) + return expect + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(cs.getNodePodProbe()) + informer := newNodePodProbeInformer(fakeClient, "node-1") + c := &Controller{ + nodePodProbeInformer: informer, + nodePodProbeLister: listersalpha1.NewNodePodProbeLister(informer.GetIndexer()), + workers: make(map[probeKey]*worker), + nodePodProbeClient: fakeClient.AppsV1alpha1().NodePodProbes(), + nodeName: "node-1", + result: newResultManager(workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second), + "sync_node_pod_probe", + )), + updateQueue: workqueue.NewNamedRateLimitingQueue( + // Backoff duration from 500ms to 50~55s + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(1000)), + "update_node_pod_probe_status", + ), + } + stopCh := make(chan struct{}, 1) + go c.nodePodProbeInformer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, c.nodePodProbeInformer.HasSynced) { + return + } + cs.setWorkers(c) + time.Sleep(time.Second) + err := c.sync() + if err != nil { + t.Fatalf("NodePodProbe sync failed: %s", err.Error()) + return + } + time.Sleep(time.Second) + if len(c.workers) != len(cs.expectWorkers(c)) { + t.Fatalf("expect(%d), but get(%d)", len(cs.expectWorkers(c)), len(c.workers)) + } + for _, worker := range cs.expectWorkers(c) { + obj, ok := c.workers[worker.key] + if !ok { + t.Fatalf("expect(%v), but not found", worker.key) + } + if !reflect.DeepEqual(worker.spec, obj.spec) { + t.Fatalf("expect(%s), but get(%s)", commonutil.DumpJSON(worker.spec), commonutil.DumpJSON(obj.spec)) + } + } + }) + } +} diff --git a/pkg/daemon/podprobe/prober.go b/pkg/daemon/podprobe/prober.go new file mode 100644 index 0000000000..70f9482717 --- /dev/null +++ b/pkg/daemon/podprobe/prober.go @@ -0,0 +1,149 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "bytes" + "fmt" + "io" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + criapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/probe" + execprobe "k8s.io/kubernetes/pkg/probe/exec" + "k8s.io/utils/exec" +) + +const maxProbeMessageLength = 1024 + +// Prober helps to check the probe(exec, http, tcp) of a container. +type prober struct { + exec execprobe.Prober + runtimeService criapi.RuntimeService +} + +// NewProber creates a Prober, it takes a command runner and +// several container info managers. +func newProber(runtimeService criapi.RuntimeService) *prober { + return &prober{ + exec: execprobe.New(), + runtimeService: runtimeService, + } +} + +// probe probes the container. +func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) { + result, msg, err := pb.runProbe(p, container, containerID) + if bytes.Count([]byte(msg), nil)-1 > maxProbeMessageLength { + msg = msg[:maxProbeMessageLength] + } + if err != nil || (result != probe.Success && result != probe.Warning) { + return appsv1alpha1.ProbeFailed, msg, err + } + return appsv1alpha1.ProbeSucceeded, msg, nil +} + +func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) { + timeSecond := p.TimeoutSeconds + if timeSecond <= 0 { + timeSecond = 1 + } + timeout := time.Duration(timeSecond) * time.Second + // current only support exec + // todo: http, tcp + if p.Exec != nil { + return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout)) + } + klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name) + return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name) +} + +type execInContainer struct { + // run executes a command in a container. Combined stdout and stderr output is always returned. An + // error is returned if one occurred. + run func() ([]byte, error) + writer io.Writer +} + +func (pb *prober) newExecInContainer(containerID string, cmd []string, timeout time.Duration) exec.Cmd { + return &execInContainer{run: func() ([]byte, error) { + stdout, stderr, err := pb.runtimeService.ExecSync(containerID, cmd, timeout) + if err != nil { + return stderr, err + } + return stdout, nil + }} +} + +func (eic *execInContainer) Run() error { + return nil +} + +func (eic *execInContainer) CombinedOutput() ([]byte, error) { + return eic.run() +} + +func (eic *execInContainer) Output() ([]byte, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (eic *execInContainer) SetDir(dir string) { + // unimplemented +} + +func (eic *execInContainer) SetStdin(in io.Reader) { + // unimplemented +} + +func (eic *execInContainer) SetStdout(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetStderr(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetEnv(env []string) { + // unimplemented +} + +func (eic *execInContainer) Stop() { + // unimplemented +} + +func (eic *execInContainer) Start() error { + data, err := eic.run() + if eic.writer != nil { + eic.writer.Write(data) + } + return err +} + +func (eic *execInContainer) Wait() error { + return nil +} + +func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) { + return nil, fmt.Errorf("unimplemented") +} diff --git a/pkg/daemon/podprobe/results_manager.go b/pkg/daemon/podprobe/results_manager.go new file mode 100644 index 0000000000..aac32c5c0d --- /dev/null +++ b/pkg/daemon/podprobe/results_manager.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "sync" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" +) + +const maxSyncProbeTime = 600 + +// Update is an enum of the types of updates sent over the Updates channel. +type Update struct { + ContainerID string + Key probeKey + State appsv1alpha1.ProbeState + Msg string + LastProbeTime metav1.Time +} + +// resultManager implementation, store container probe result +type resultManager struct { + // map of container ID -> probe Result + cache *sync.Map + queue workqueue.RateLimitingInterface +} + +// newResultManager creates and returns an empty results resultManager. +func newResultManager(queue workqueue.RateLimitingInterface) *resultManager { + return &resultManager{ + cache: &sync.Map{}, + queue: queue, + } +} + +func (m *resultManager) listResults() []Update { + var results []Update + listFunc := func(key, value any) bool { + results = append(results, value.(Update)) + return true + } + m.cache.Range(listFunc) + return results +} + +func (m *resultManager) set(id string, key probeKey, result appsv1alpha1.ProbeState, msg string) { + currentTime := metav1.Now() + prev, exists := m.cache.Load(id) + if !exists || prev.(Update).State != result || currentTime.Sub(prev.(Update).LastProbeTime.Time) >= maxSyncProbeTime { + m.cache.Store(id, Update{id, key, result, msg, currentTime}) + m.queue.Add("updateStatus") + } +} + +func (m *resultManager) remove(id string) { + m.cache.Delete(id) +} diff --git a/pkg/daemon/podprobe/worker.go b/pkg/daemon/podprobe/worker.go new file mode 100644 index 0000000000..246258dbcf --- /dev/null +++ b/pkg/daemon/podprobe/worker.go @@ -0,0 +1,197 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprobe + +import ( + "fmt" + "math/rand" + "reflect" + "time" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + "k8s.io/apimachinery/pkg/util/runtime" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" +) + +// worker handles the periodic probing of its assigned container. Each worker has a go-routine +// associated with it which runs the probe loop until the container permanently terminates, or the +// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date +// container IDs. +type worker struct { + // Channel for stopping the probe. + stopCh chan struct{} + + // pod uid, container name, probe name + key probeKey + + // Describes the probe configuration + spec *appsv1alpha1.ContainerProbeSpec + + // The probe value during the initial delay. + initialValue appsv1alpha1.ProbeState + + probeController *Controller + + // The last known container ID for this worker. + containerID string + // The last probe result for this worker. + lastResult appsv1alpha1.ProbeState + // How many times in a row the probe has returned the same result. + resultRun int +} + +// Creates and starts a new probe worker. +func newWorker(c *Controller, key probeKey, probe *appsv1alpha1.ContainerProbeSpec) *worker { + + w := &worker{ + stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. + key: key, + spec: probe, + probeController: c, + initialValue: appsv1alpha1.ProbeUnknown, + } + + return w +} + +// run periodically probes the container. +func (w *worker) run() { + periodSecond := w.spec.PeriodSeconds + if periodSecond < 1 { + periodSecond = 1 + } + probeTickerPeriod := time.Duration(periodSecond) * time.Second + // If kruise daemon restarted the probes could be started in rapid succession. + // Let the worker wait for a random portion of tickerPeriod before probing. + // Do it only if the kruise daemon has started recently. + if probeTickerPeriod > time.Since(w.probeController.start) { + time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) + } + probeTicker := time.NewTicker(probeTickerPeriod) + defer func() { + // Clean up. + probeTicker.Stop() + if w.containerID != "" { + w.probeController.result.remove(w.containerID) + } + w.probeController.removeWorker(w.key) + }() + +probeLoop: + for w.doProbe() { + // Wait for next probe tick. + select { + case <-w.stopCh: + break probeLoop + case <-probeTicker.C: + } + } +} + +// stop stops the probe worker. The worker handles cleanup and removes itself from its manager. +// It is safe to call stop multiple times. +func (w *worker) stop() { + select { + case w.stopCh <- struct{}{}: + default: // Non-blocking. + } +} + +// doProbe probes the container once and records the result. +// Returns whether the worker should continue. +func (w *worker) doProbe() (keepGoing bool) { + defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging) + defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) + + container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) + if container == nil { + klog.V(5).Infof("Pod(%s/%s) container(%s) Not Found", w.key.podNs, w.key.podName, w.key.containerName) + return true + } + + if w.containerID != container.Id { + if w.containerID != "" { + w.probeController.result.remove(w.containerID) + } + klog.V(5).Infof("Pod(%s/%s) container(%s) Id changed(%s -> %s)", w.key.podNs, w.key.podName, w.key.containerName, w.containerID, container.Id) + w.containerID = container.Id + w.probeController.result.set(w.containerID, w.key, w.initialValue, "") + } + if container.State != runtimeapi.ContainerState_CONTAINER_RUNNING { + klog.V(5).Infof("Pod(%s/%s) Non-running container(%s) probed", w.key.podNs, w.key.podName, w.key.containerName) + w.probeController.result.set(w.containerID, w.key, appsv1alpha1.ProbeFailed, fmt.Sprintf("Container(%s) is Non-running", w.key.containerName)) + } + + // Probe disabled for InitialDelaySeconds. + initialDelay := w.spec.InitialDelaySeconds + if initialDelay < 1 { + initialDelay = 1 + } + curDelay := int32(time.Since(time.Unix(0, container.StartedAt)).Seconds()) + if curDelay < initialDelay { + klog.V(5).Infof("Pod(%s:%s) container(%s) probe(%s) initialDelay(%d), but curDelay(%d)", + w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, initialDelay, curDelay) + return true + } + + // the full container environment here, OR we must make a call to the CRI in order to get those environment + // values from the running container. + result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID) + if err != nil { + klog.Errorf("Pod(%s/%s) do container(%s) probe(%s) spec(%s) failed: %s", + w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, util.DumpJSON(w.spec), err.Error()) + return true + } + if w.lastResult == result { + w.resultRun++ + } else { + w.lastResult = result + w.resultRun = 1 + } + + failureThreshold := w.spec.FailureThreshold + if failureThreshold <= 0 { + failureThreshold = 1 + } + successThreshold := w.spec.SuccessThreshold + if successThreshold <= 0 { + successThreshold = 1 + } + if (result == appsv1alpha1.ProbeFailed && w.resultRun < int(failureThreshold)) || + (result == appsv1alpha1.ProbeSucceeded && w.resultRun < int(successThreshold)) { + // Success or failure is below threshold - leave the probe state unchanged. + return true + } + w.probeController.result.set(w.containerID, w.key, result, msg) + return true +} + +func (w *worker) getProbeSpec() *appsv1alpha1.ContainerProbeSpec { + return w.spec +} + +func (w *worker) updateProbeSpec(spec *appsv1alpha1.ContainerProbeSpec) { + if !reflect.DeepEqual(w.spec.Handler, spec.Handler) { + if w.containerID != "" { + klog.Infof("Pod(%s) container(%s) probe spec changed", w.key.podUID, w.key.containerName) + w.probeController.result.set(w.containerID, w.key, w.initialValue, "") + } + } + w.spec = spec +} diff --git a/pkg/features/kruise_features.go b/pkg/features/kruise_features.go index a035beae6b..4ed79425ae 100644 --- a/pkg/features/kruise_features.go +++ b/pkg/features/kruise_features.go @@ -85,6 +85,10 @@ const ( // SidecarSetPatchPodMetadataDefaultsAllowed whether sidecarSet patch pod metadata is allowed SidecarSetPatchPodMetadataDefaultsAllowed featuregate.Feature = "SidecarSetPatchPodMetadataDefaultsAllowed" + + // PodProbeMarkerGate enable Kruise provide the ability to execute custom Probes. + // Note: custom probe execution requires kruise daemon, so currently only traditional Kubelet is supported, not virtual-kubelet. + PodProbeMarkerGate featuregate.Feature = "PodProbeMarkerGate" ) var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ @@ -104,6 +108,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ InPlaceUpdateEnvFromMetadata: {Default: false, PreRelease: featuregate.Alpha}, StatefulSetAutoDeletePVC: {Default: false, PreRelease: featuregate.Alpha}, SidecarSetPatchPodMetadataDefaultsAllowed: {Default: false, PreRelease: featuregate.Alpha}, + PodProbeMarkerGate: {Default: false, PreRelease: featuregate.Alpha}, } func init() { diff --git a/test/e2e/apps/podprobemarker.go b/test/e2e/apps/podprobemarker.go new file mode 100644 index 0000000000..35ac0bc721 --- /dev/null +++ b/test/e2e/apps/podprobemarker.go @@ -0,0 +1,263 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apps + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/controller/podprobemarker" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/test/e2e/framework" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" + utilpointer "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +var _ = SIGDescribe("PodProbeMarker", func() { + f := framework.NewDefaultFramework("podprobemarkers") + var ns string + var c clientset.Interface + var kc kruiseclientset.Interface + var tester *framework.PodProbeMarkerTester + + ginkgo.BeforeEach(func() { + c = f.ClientSet + kc = f.KruiseClientSet + ns = f.Namespace.Name + tester = framework.NewPodProbeMarkerTester(c, kc) + }) + + framework.KruiseDescribe("PodProbeMarker functionality", func() { + + ginkgo.AfterEach(func() { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + }) + + ginkgo.It("pod probe marker test1", func() { + nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + if len(nodeList.Items) == 0 { + ginkgo.By("pod probe markers list nodeList is zero") + return + } + nppList, err := kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(nppList.Items).To(gomega.HaveLen(len(nodeList.Items))) + + // create statefulset + sts := tester.NewBaseStatefulSet(ns) + ginkgo.By(fmt.Sprintf("Create statefulset(%s/%s)", sts.Namespace, sts.Name)) + tester.CreateStatefulSet(sts) + + // create pod probe marker + ppmList := tester.NewPodProbeMarker(ns) + ppm1, ppm2 := &ppmList[0], &ppmList[1] + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Create(context.TODO(), ppm1, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 10) + + // check finalizer + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(controllerutil.ContainsFinalizer(ppm1, podprobemarker.PodProbeMarkerFinalizer)).To(gomega.BeTrue()) + + pods, err := tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + validPods := sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + npp, err := kc.AppsV1alpha1().NodePodProbes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + var podProbe *appsv1alpha1.PodProbe + for i := range npp.Spec.PodProbes { + obj := &npp.Spec.PodProbes[i] + if obj.UID == string(pod.UID) { + podProbe = obj + break + } + } + gomega.Expect(podProbe).NotTo(gomega.BeNil()) + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).To(gomega.BeNil()) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + // create other pod probe marker + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Create(context.TODO(), ppm2, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 10) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + for _, pod := range pods { + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + // check probe + gomega.Expect(pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]).To(gomega.Equal("10")) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + } + + // update failed probe + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm1.Spec.Probes[0].Probe.Exec = &v1.ExecAction{ + Command: []string{"/bin/sh", "-c", "failed /"}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ppm2, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm2.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm2.Spec.Probes[0].Probe.Exec = &v1.ExecAction{ + Command: []string{"/bin/sh", "-c", "failed -ef"}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm2, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 10) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + for _, pod := range pods { + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionFalse))) + // check probe + gomega.Expect(pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]).To(gomega.Equal("-10")) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionFalse))) + } + + // update success probe + ppm1, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Get(context.TODO(), ppm1.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ppm1.Spec.Probes[0].Probe.Exec = &v1.ExecAction{ + Command: []string{"/bin/sh", "-c", "ls /"}, + } + _, err = kc.AppsV1alpha1().PodProbeMarkers(ns).Update(context.TODO(), ppm1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // scale down + sts, err = c.AppsV1().StatefulSets(ns).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sts.Spec.Replicas = utilpointer.Int32Ptr(1) + _, err = c.AppsV1().StatefulSets(ns).Update(context.TODO(), sts, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 10) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + validPods = sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + // check probe + gomega.Expect(pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]).To(gomega.Equal("-10")) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionFalse))) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // scale up + sts, err = c.AppsV1().StatefulSets(ns).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sts.Spec.Replicas = utilpointer.Int32Ptr(2) + _, err = c.AppsV1().StatefulSets(ns).Update(context.TODO(), sts, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + tester.WaitForStatefulSetRunning(sts) + time.Sleep(time.Second * 10) + + pods, err = tester.ListActivePods(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods).To(gomega.HaveLen(int(*sts.Spec.Replicas))) + validPods = sets.NewString() + for _, pod := range pods { + validPods.Insert(string(pod.UID)) + // healthy probe + gomega.Expect(pod.Labels["nginx"]).To(gomega.Equal("healthy")) + condition := util.GetCondition(pod, "game.kruise.io/healthy") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionTrue))) + // check probe + gomega.Expect(pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]).To(gomega.Equal("-10")) + condition = util.GetCondition(pod, "game.kruise.io/check") + gomega.Expect(condition).NotTo(gomega.BeNil()) + gomega.Expect(string(condition.Status)).To(gomega.Equal(string(v1.ConditionFalse))) + } + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + for _, podProbe := range npp.Spec.PodProbes { + gomega.Expect(validPods.Has(podProbe.UID)).To(gomega.BeTrue()) + } + } + + // delete podProbeMarker + for _, ppm := range ppmList { + err = kc.AppsV1alpha1().PodProbeMarkers(ns).Delete(context.TODO(), ppm.Name, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + time.Sleep(time.Second * 3) + nppList, err = kc.AppsV1alpha1().NodePodProbes().List(context.TODO(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, npp := range nppList.Items { + gomega.Expect(npp.Spec.PodProbes).To(gomega.HaveLen(0)) + } + }) + }) +}) diff --git a/test/e2e/framework/pod_probe_marker_util.go b/test/e2e/framework/pod_probe_marker_util.go new file mode 100644 index 0000000000..4dd0b13c22 --- /dev/null +++ b/test/e2e/framework/pod_probe_marker_util.go @@ -0,0 +1,218 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "time" + + "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + kubecontroller "k8s.io/kubernetes/pkg/controller" + utilpointer "k8s.io/utils/pointer" +) + +type PodProbeMarkerTester struct { + c clientset.Interface + kc kruiseclientset.Interface +} + +func NewPodProbeMarkerTester(c clientset.Interface, kc kruiseclientset.Interface) *PodProbeMarkerTester { + return &PodProbeMarkerTester{ + c: c, + kc: kc, + } +} + +func (s *PodProbeMarkerTester) NewPodProbeMarker(ns string) []appsv1alpha1.PodProbeMarker { + nginx := appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-nginx", + Namespace: ns, + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "probe", + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "healthy", + ContainerName: "nginx", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "ls /"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/healthy", + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Labels: map[string]string{ + "nginx": "healthy", + }, + }, + }, + }, + }, + }, + } + + main := appsv1alpha1.PodProbeMarker{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ppm-main", + Namespace: ns, + }, + Spec: appsv1alpha1.PodProbeMarkerSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "probe", + }, + }, + Probes: []appsv1alpha1.PodContainerProbe{ + { + Name: "check", + ContainerName: "main", + Probe: appsv1alpha1.ContainerProbeSpec{ + Probe: corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "ps -ef"}, + }, + }, + }, + }, + PodConditionType: "game.kruise.io/check", + MarkerPolicy: []appsv1alpha1.ProbeMarkerPolicy{ + { + State: appsv1alpha1.ProbeSucceeded, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "10", + }, + }, + { + State: appsv1alpha1.ProbeFailed, + Annotations: map[string]string{ + "controller.kubernetes.io/pod-deletion-cost": "-10", + }, + }, + }, + }, + }, + }, + } + + return []appsv1alpha1.PodProbeMarker{nginx, main} +} + +func (s *PodProbeMarkerTester) NewBaseStatefulSet(namespace string) *apps.StatefulSet { + return &apps.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "deployment-test", + Namespace: namespace, + }, + Spec: apps.StatefulSetSpec{ + ServiceName: "fake-service", + Replicas: utilpointer.Int32Ptr(2), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "probe", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "probe", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx:1.15.1", + ImagePullPolicy: corev1.PullIfNotPresent, + }, + { + Name: "main", + Image: "centos:6.7", + Command: []string{"sleep", "999d"}, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } +} + +func (s *PodProbeMarkerTester) CreateStatefulSet(sts *apps.StatefulSet) { + Logf("create sts(%s/%s)", sts.Namespace, sts.Name) + _, err := s.c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + s.WaitForStatefulSetRunning(sts) +} + +func (s *PodProbeMarkerTester) WaitForStatefulSetRunning(sts *apps.StatefulSet) { + pollErr := wait.PollImmediate(time.Second, time.Minute, + func() (bool, error) { + inner, err := s.c.AppsV1().StatefulSets(sts.Namespace).Get(context.TODO(), sts.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + if inner.Generation != inner.Status.ObservedGeneration { + return false, nil + } + if *inner.Spec.Replicas == inner.Status.Replicas && *inner.Spec.Replicas == inner.Status.UpdatedReplicas && + *inner.Spec.Replicas == inner.Status.ReadyReplicas { + return true, nil + } + return false, nil + }) + if pollErr != nil { + Failf("Failed waiting for statefulset to enter running: %v", pollErr) + } +} + +func (s *PodProbeMarkerTester) ListActivePods(ns string) ([]*corev1.Pod, error) { + podList, err := s.c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + var pods []*corev1.Pod + for i := range podList.Items { + pod := &podList.Items[i] + if kubecontroller.IsPodActive(pod) && pod.Spec.NodeName != "" { + pods = append(pods, pod) + } + } + return pods, nil +}