From e8d861fbede34d34797f06eead0de9cded6fd244 Mon Sep 17 00:00:00 2001 From: Amir Alavi Date: Thu, 8 Aug 2024 22:59:04 -0400 Subject: [PATCH] work suspension: emit event for work dispatch status Signed-off-by: Amir Alavi --- .../execution/execution_controller.go | 20 +++++- .../execution/execution_controller_test.go | 69 +++++++++++++------ pkg/events/events.go | 2 + test/e2e/framework/events.go | 38 ++++++++++ test/e2e/propagationpolicy_test.go | 23 ++++++- test/helper/resource.go | 4 ++ 6 files changed, 131 insertions(+), 25 deletions(-) create mode 100644 test/e2e/framework/events.go diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 0f1b5906f1b6..0d8d77b2a331 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -52,6 +52,10 @@ import ( const ( // ControllerName is the controller name that will be used when reporting events. ControllerName = "execution-controller" + // WorkSuspendDispatchingConditionMessage is the condition and event message when dispatching is suspended. + WorkSuspendDispatchingConditionMessage = "Work dispatching is in a suspended state." + // WorkDispatchingConditionMessage is the condition and event message when dispatching is not suspended. + WorkDispatchingConditionMessage = "Work is being dispatched to member clusters." // workSuspendDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is suspended. workSuspendDispatchingConditionReason = "SuspendDispatching" // workDispatchingConditionReason is the reason for the WorkDispatching condition when dispatching is not suspended. @@ -270,18 +274,28 @@ func (c *Controller) updateWorkDispatchingConditionIfNeeded(ctx context.Context, if helper.IsWorkSuspendDispatching(work) { newWorkDispatchingCondition.Status = metav1.ConditionFalse newWorkDispatchingCondition.Reason = workSuspendDispatchingConditionReason - newWorkDispatchingCondition.Message = "Work dispatching is in a suspended state." + newWorkDispatchingCondition.Message = WorkSuspendDispatchingConditionMessage } else { newWorkDispatchingCondition.Status = metav1.ConditionTrue newWorkDispatchingCondition.Reason = workDispatchingConditionReason - newWorkDispatchingCondition.Message = "Work is being dispatched to member clusters." + newWorkDispatchingCondition.Message = WorkDispatchingConditionMessage } if meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, newWorkDispatchingCondition.Type, newWorkDispatchingCondition.Status) { return nil } - return c.setStatusCondition(ctx, work, newWorkDispatchingCondition) + if err := c.setStatusCondition(ctx, work, newWorkDispatchingCondition); err != nil { + return err + } + + obj, err := helper.ToUnstructured(work) + if err != nil { + return err + } + + c.eventf(obj, corev1.EventTypeNormal, events.EventReasonWorkDispatching, newWorkDispatchingCondition.Message) + return nil } // updateAppliedCondition updates the applied condition for the given Work. diff --git a/pkg/controllers/execution/execution_controller_test.go b/pkg/controllers/execution/execution_controller_test.go index 5c8500a3c898..5c7883ed7d97 100644 --- a/pkg/controllers/execution/execution_controller_test.go +++ b/pkg/controllers/execution/execution_controller_test.go @@ -17,19 +17,24 @@ package execution import ( "context" + "encoding/json" + "fmt" "testing" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/gclient" "github.com/karmada-io/karmada/pkg/util/helper" @@ -38,34 +43,50 @@ import ( func TestExecutionController_Reconcile(t *testing.T) { tests := []struct { - name string - work *workv1alpha1.Work - ns string - expectRes controllerruntime.Result - expectCondition *metav1.Condition - existErr bool + name string + work *workv1alpha1.Work + ns string + expectRes controllerruntime.Result + expectCondition *metav1.Condition + expectEventMessage string + existErr bool }{ { - name: "work dispatching is suspended, no error, no apply", - work: newWork(func(work *workv1alpha1.Work) { - work.Spec.SuspendDispatching = ptr.To(true) - }), + name: "work dispatching is suspended, no error, no apply", ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, existErr: false, + work: newWork(func(work *workv1alpha1.Work) { + work.Spec.SuspendDispatching = ptr.To(true) + }), + }, + { + name: "work dispatching is suspended, adds false dispatching condition", + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, + existErr: false, + + work: newWork(func(w *workv1alpha1.Work) { + w.Spec.SuspendDispatching = ptr.To(true) + }), }, { - name: "work dispatching is suspended, adds false dispatching condition", + name: "work dispatching is suspended, adds event message", + ns: "karmada-es-cluster", + expectRes: controllerruntime.Result{}, + expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonWorkDispatching, WorkSuspendDispatchingConditionMessage), + existErr: false, work: newWork(func(w *workv1alpha1.Work) { w.Spec.SuspendDispatching = ptr.To(true) }), + }, + { + name: "work dispatching is suspended, overwrites existing dispatching condition", ns: "karmada-es-cluster", expectRes: controllerruntime.Result{}, expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, existErr: false, - }, - { - name: "work dispatching is suspended, overwrites existing dispatching condition", work: newWork(func(w *workv1alpha1.Work) { w.Spec.SuspendDispatching = ptr.To(true) meta.SetStatusCondition(&w.Status.Conditions, metav1.Condition{ @@ -74,10 +95,6 @@ func TestExecutionController_Reconcile(t *testing.T) { Reason: workDispatchingConditionReason, }) }), - ns: "karmada-es-cluster", - expectRes: controllerruntime.Result{}, - expectCondition: &metav1.Condition{Type: workv1alpha1.WorkDispatching, Status: metav1.ConditionFalse}, - existErr: false, }, } @@ -90,7 +107,8 @@ func TestExecutionController_Reconcile(t *testing.T) { }, } - c := newController(tt.work) + eventRecorder := record.NewFakeRecorder(1) + c := newController(tt.work, eventRecorder) res, err := c.Reconcile(context.Background(), req) assert.Equal(t, tt.expectRes, res) if tt.existErr { @@ -103,21 +121,30 @@ func TestExecutionController_Reconcile(t *testing.T) { assert.NoError(t, c.Client.Get(context.Background(), req.NamespacedName, tt.work)) assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.work.Status.Conditions, tt.expectCondition.Type, tt.expectCondition.Status)) } + + if tt.expectEventMessage != "" { + assert.Equal(t, 1, len(eventRecorder.Events)) + e := <-eventRecorder.Events + assert.Equal(t, tt.expectEventMessage, e) + } }) } } -func newController(work *workv1alpha1.Work) Controller { +func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller { cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) return Controller{ Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).Build(), InformerManager: genericmanager.GetInstance(), PredicateFunc: helper.NewClusterPredicateOnAgent("test"), + EventRecorder: eventRecorder, } } func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work { - work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), nil) + pod := testhelper.NewPod("default", "test") + bytes, _ := json.Marshal(pod) + work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes) if applyFunc != nil { applyFunc(work) } diff --git a/pkg/events/events.go b/pkg/events/events.go index ea94a2cf99dd..8e3e945b2247 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -54,6 +54,8 @@ const ( EventReasonSyncWorkloadFailed = "SyncFailed" // EventReasonSyncWorkloadSucceed indicates that Sync workload succeed. EventReasonSyncWorkloadSucceed = "SyncSucceed" + // EventReasonWorkDispatching indicates that work is dispatching or not. + EventReasonWorkDispatching = "WorkDispatching" ) // Define events for ResourceBinding and ClusterResourceBinding objects. diff --git a/test/e2e/framework/events.go b/test/e2e/framework/events.go new file mode 100644 index 000000000000..65e0237e583c --- /dev/null +++ b/test/e2e/framework/events.go @@ -0,0 +1,38 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package framework + +import ( + "context" + "slices" + + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" +) + +// WaitEventFitWith wait PropagationPolicy sync with fit func. +func WaitEventFitWith(kubeClient kubernetes.Interface, namespace string, involvedObj string, fit func(policy corev1.Event) bool) { + gomega.Eventually(func() bool { + eventList, err := kubeClient.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("involvedObject.name", involvedObj).String(), + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + return slices.ContainsFunc(eventList.Items, fit) + }, pollTimeout, pollInterval).Should(gomega.Equal(true)) +} diff --git a/test/e2e/propagationpolicy_test.go b/test/e2e/propagationpolicy_test.go index 6f8dcd48e6b8..2e91b78b5455 100644 --- a/test/e2e/propagationpolicy_test.go +++ b/test/e2e/propagationpolicy_test.go @@ -41,6 +41,9 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/ptr" + "github.com/karmada-io/karmada/pkg/controllers/execution" + "github.com/karmada-io/karmada/pkg/events" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -100,6 +103,15 @@ var _ = ginkgo.Describe("[BasicPropagation] propagation testing", func() { return *deployment.Spec.Replicas == updateDeploymentReplicas }) }) + + ginkgo.It("adds dispatching event with a dispatching message", func() { + workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) + esName := names.GenerateExecutionSpaceName(framework.ClusterNames()[0]) + framework.WaitEventFitWith(kubeClient, esName, workName, func(event corev1.Event) bool { + return event.Reason == events.EventReasonWorkDispatching && + event.Message == execution.WorkDispatchingConditionMessage + }) + }) }) ginkgo.Context("Service propagation testing", func() { @@ -1170,7 +1182,7 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() { }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) - ginkgo.It("adds suspend dispatching condition to Work\"", func() { + ginkgo.It("adds suspend dispatching condition to Work", func() { workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) esName := names.GenerateExecutionSpaceName(targetMember) gomega.Eventually(func() bool { @@ -1181,5 +1193,14 @@ var _ = ginkgo.Describe("[Suspend] PropagationPolicy testing", func() { return work != nil && meta.IsStatusConditionPresentAndEqual(work.Status.Conditions, workv1alpha1.WorkDispatching, metav1.ConditionFalse) }, pollTimeout, pollInterval).Should(gomega.Equal(true)) }) + + ginkgo.It("adds dispatching event with suspend message", func() { + workName := names.GenerateWorkName(deployment.Kind, deployment.Name, deployment.Namespace) + esName := names.GenerateExecutionSpaceName(targetMember) + framework.WaitEventFitWith(kubeClient, esName, workName, func(event corev1.Event) bool { + return event.Reason == events.EventReasonWorkDispatching && + event.Message == execution.WorkSuspendDispatchingConditionMessage + }) + }) }) }) diff --git a/test/helper/resource.go b/test/helper/resource.go index 3456beadb649..09d7954259a4 100644 --- a/test/helper/resource.go +++ b/test/helper/resource.go @@ -960,6 +960,10 @@ func NewPodDisruptionBudget(namespace, name string, maxUnAvailable intstr.IntOrS // NewWork will build a new Work object. func NewWork(workName, workNs, workUID string, raw []byte) *workv1alpha1.Work { work := &workv1alpha1.Work{ + TypeMeta: metav1.TypeMeta{ + Kind: workv1alpha1.ResourceKindWork, + APIVersion: workv1alpha1.GroupVersion.Version, + }, ObjectMeta: metav1.ObjectMeta{ Name: workName, Namespace: workNs,