diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 0f1b5906f1b6..0d8d77b2a331 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -52,6 +52,10 @@ import ( const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "execution-controller" + // WorkSuspendDispatchingConditionMessage is the condition and event message when dispatching is suspended. + WorkSuspendDispatchingConditionMessage = "Work dispatching is in a suspended state." + // WorkDispatchingConditionMessage is the condition and event message when dispatching is not suspended. + WorkDispatchingConditionMessage = "Work is being dispatched to member clusters." // workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended. workSuspendDispatchingConditionReason = "SuspendDispatching" // workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended. @@ -270,18 +274,28 @@ func (c *Controller) updateWorkDispatchingConditionIfNeeded(ctx context.Context, if helper.IsWorkSuspendDispatching(work) { newWorkDispatchingCondition.Status = metav1.ConditionFalse newWorkDispatchingCondition.Reason = workSuspendDispatchingConditionReason - newWorkDispatchingCondition.Message = "Work dispatching is in a suspended state." + newWorkDispatchingCondition.Message = WorkSuspendDispatchingConditionMessage } else { newWorkDispatchingCondition.Status = metav1.ConditionTrue newWorkDispatchingCondition.Reason = workDispatchingConditionReason - newWorkDispatchingCondition.Message = "Work is being dispatched to member clusters." + newWorkDispatchingCondition.Message = WorkDispatchingConditionMessage } if meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, newWorkDispatchingCondition.Type, newWorkDispatchingCondition.Status) { return nil } - return c.setStatusCondition(ctx, work, newWorkDispatchingCondition) + if err := c.setStatusCondition(ctx, work, newWorkDispatchingCondition); err != nil { + return err + } + + obj, err := helper.ToUnstructured(work) + if err != nil { + return err + } + + c.eventf(obj, corev1.EventTypeNormal, events.EventReasonWorkDispatching, newWorkDispatchingCondition.Message) + return nil } // updateAppliedCondition updates the applied condition for the given Work. diff --git a/pkg/controllers/execution/execution_controller_test.go b/pkg/controllers/execution/execution_controller_test.go index 5c8500a3c898..5c7883ed7d97 100644 --- a/pkg/controllers/execution/execution_controller_test.go +++ b/pkg/controllers/execution/execution_controller_test.go @@ -17,19 +17,24 @@ package execution import ( "context" + "encoding/json" + "fmt" "testing" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" @@ -38,34 +43,50 @@ import ( func TestExecutionController_Reconcile(t *testing.T) { tests := []struct { - name string - work *workv1alpha1.Work - ns string - expectRes controllerruntime.Result - expectCondition *metav1.Condition - existErr bool + name string + work *workv1alpha1.Work + ns string + expectRes controllerruntime.Result + expectCondition *metav1.Condition + expectEventMessage string + existErr bool }{ { - name: "work dispatching is suspended, no error, no apply", - work: newWork(func(work *workv1alpha1.Work) { - work.Spec.SuspendDispatching = ptr.To(true) - }), + name: "work dispatching is suspended, no error, no apply", ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, + work: newWork(func(work *workv1alpha1.Work) { + work.Spec.SuspendDispatching = ptr.To(true) + }), + }, + { + name: "work dispatching is suspended, adds false dispatching condition", + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, + existErr: false, + + work: newWork(func(w *workv1alpha1.Work) { + w.Spec.SuspendDispatching = ptr.To(true) + }), }, { - name: "work dispatching is suspended, adds false dispatching condition", + name: "work dispatching is suspended, adds event message", + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonWorkDispatching, WorkSuspendDispatchingConditionMessage), + existErr: false, work: newWork(func(w *workv1alpha1.Work) { w.Spec.SuspendDispatching = ptr.To(true) }), + }, + { + name: "work dispatching is suspended, overwrites existing dispatching condition", ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, existErr: false, - }, - { - name: "work dispatching is suspended, overwrites existing dispatching condition", work: newWork(func(w *workv1alpha1.Work) { w.Spec.SuspendDispatching = ptr.To(true) meta.SetStatusCondition(&w.Status.Conditions, metav1.Condition{ @@ -74,10 +95,6 @@ func TestExecutionController_Reconcile(t *testing.T) { Reason: workDispatchingConditionReason, }) }), - ns: "karmada-es-cluster", - expectRes: controllerruntime.Result{}, - expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, - existErr: false, }, } @@ -90,7 +107,8 @@ func TestExecutionController_Reconcile(t *testing.T) { }, } - c := newController(tt.work) + eventRecorder := record.NewFakeRecorder(1) + c := newController(tt.work, eventRecorder) res, err := c.Reconcile(context.Background(), req) assert.Equal(t, tt.expectRes, res) if tt.existErr { @@ -103,21 +121,30 @@ func TestExecutionController_Reconcile(t *testing.T) { assert.NoError(t, c.Client.Get(context.Background(), req.NamespacedName, tt.work)) assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.work.Status.Conditions, tt.expectCondition.Type, tt.expectCondition.Status)) } + + if tt.expectEventMessage != "" { + assert.Equal(t, 1, len(eventRecorder.Events)) + e := <-eventRecorder.Events + assert.Equal(t, tt.expectEventMessage, e) + } }) } } -func newController(work *workv1alpha1.Work) Controller { +func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller { cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) return Controller{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).Build(), InformerManager: genericmanager.GetInstance(), PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + EventRecorder: eventRecorder, } } func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work { - work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), nil) + pod := testhelper.NewPod("default", "test") + bytes, _ := json.Marshal(pod) + work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes) if applyFunc != nil { applyFunc(work) } diff --git a/pkg/events/events.go b/pkg/events/events.go index ea94a2cf99dd..8e3e945b2247 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -54,6 +54,8 @@ const ( EventReasonSyncWorkloadFailed = "SyncFailed" // EventReasonSyncWorkloadSucceed indicates that Sync workload succeed. EventReasonSyncWorkloadSucceed = "SyncSucceed" + // EventReasonWorkDispatching indicates that work is dispatching or not. + EventReasonWorkDispatching = "WorkDispatching" ) // Define events for ResourceBinding and ClusterResourceBinding objects. diff --git a/pkg/util/lifted/corev1printers.go b/pkg/util/lifted/corev1printers.go index ab493499c988..b85afd37346c 100644 --- a/pkg/util/lifted/corev1printers.go +++ b/pkg/util/lifted/corev1printers.go @@ -21,11 +21,10 @@ import ( "strings" "time" - "k8s.io/apimachinery/pkg/util/duration" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/duration" "k8s.io/apimachinery/pkg/util/sets" "github.com/karmada-io/karmada/pkg/printers" diff --git a/pkg/util/lifted/scheduler/cache/cache.go b/pkg/util/lifted/scheduler/cache/cache.go index 15fa87f3a736..f3d3793c74ed 100644 --- a/pkg/util/lifted/scheduler/cache/cache.go +++ b/pkg/util/lifted/scheduler/cache/cache.go @@ -489,7 +489,6 @@ func (cache *cacheImpl) removePod(pod *corev1.Pod) error { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod)) - } else { if err := n.info.RemovePod(pod); err != nil { return err diff --git a/pkg/util/lifted/validatingfhpa.go b/pkg/util/lifted/validatingfhpa.go old mode 100755 new mode 100644 index 9eea4ed2be60..8aa9007ddedc --- a/pkg/util/lifted/validatingfhpa.go +++ b/pkg/util/lifted/validatingfhpa.go @@ -266,7 +266,6 @@ func validateMetricSpec(spec autoscalingv2.MetricSpec, fldPath *field.Path) fiel var expectedField string switch spec.Type { - case autoscalingv2.ObjectMetricSourceType: if spec.Object == nil { allErrs = append(allErrs, field.Required(fldPath.Child("object"), "must populate information for the given metric source")) diff --git a/test/e2e/framework/events.go b/test/e2e/framework/events.go new file mode 100644 index 000000000000..a4795faa0711 --- /dev/null +++ b/test/e2e/framework/events.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "slices" + + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" +) + +// WaitEventFitWith wait PropagationPolicy sync with fit func. +func WaitEventFitWith(kubeClient kubernetes.Interface, namespace string, involvedObj string, fit func(policy corev1.Event) bool) { + gomega.Eventually(func() bool { + eventList, err := kubeClient.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("involvedObject.name", involvedObj).String(), + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + return slices.ContainsFunc(eventList.Items, fit) + }, pollTimeout, pollInterval).Should(gomega.Equal(true)) +} diff --git a/test/e2e/propagationpolicy_test.go b/test/e2e/propagationpolicy_test.go index 6f8dcd48e6b8..6bef5de30fd7 100644 --- a/test/e2e/propagationpolicy_test.go +++ b/test/e2e/propagationpolicy_test.go @@ -44,6 +44,8 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/controllers/execution" + "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/test/e2e/framework" @@ -100,6 +102,15 @@ var _ = ginkgo.Describe("[BasicPropagation] propagation testing", func() { return *deployment.Spec.Replicas == updateDeploymentReplicas }) }) + + ginkgo.It("adds dispatching event with a dispatching message", func() { + workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) + esName := names.GenerateExecutionSpaceName(framework.ClusterNames()[0]) + framework.WaitEventFitWith(kubeClient, esName, workName, func(event corev1.Event) bool { + return event.Reason == events.EventReasonWorkDispatching && + event.Message == execution.WorkDispatchingConditionMessage + }) + }) }) ginkgo.Context("Service propagation testing", func() { @@ -1170,7 +1181,7 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() { }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) - ginkgo.It("adds suspend dispatching condition to Work\"", func() { + ginkgo.It("adds suspend dispatching condition to Work", func() { workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) esName := names.GenerateExecutionSpaceName(targetMember) gomega.Eventually(func() bool { @@ -1181,5 +1192,14 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() { return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse) }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) + + ginkgo.It("adds dispatching event with suspend message", func() { + workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) + esName := names.GenerateExecutionSpaceName(targetMember) + framework.WaitEventFitWith(kubeClient, esName, workName, func(event corev1.Event) bool { + return event.Reason == events.EventReasonWorkDispatching && + event.Message == execution.WorkSuspendDispatchingConditionMessage + }) + }) }) }) diff --git a/test/helper/resource.go b/test/helper/resource.go index 3456beadb649..09d7954259a4 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -960,6 +960,10 @@ func NewPodDisruptionBudget(namespace, name string, maxUnAvailable intstr.IntOrS // NewWork will build a new Work object. func NewWork(workName, workNs, workUID string, raw []byte) *workv1alpha1.Work { work := &workv1alpha1.Work{ + TypeMeta: metav1.TypeMeta{ + Kind: workv1alpha1.ResourceKindWork, + APIVersion: workv1alpha1.GroupVersion.Version, + }, ObjectMeta: metav1.ObjectMeta{ Name: workName, Namespace: workNs,