diff --git a/pkg/apis/work/v1alpha2/binding_types.go b/pkg/apis/work/v1alpha2/binding_types.go index a4cebc549687..5c8b65e8cd3e 100644 --- a/pkg/apis/work/v1alpha2/binding_types.go +++ b/pkg/apis/work/v1alpha2/binding_types.go @@ -398,6 +398,9 @@ const ( // FullyApplied represents the condition that the resource referencing by ResourceBinding or ClusterResourceBinding // has been applied to all scheduled clusters. FullyApplied string = "FullyApplied" + + // SchedulingSuspended represents the condition that the ResourceBinding or ClusterResourceBinding is suspended to schedule. + SchedulingSuspended string = "SchedulingSuspended" ) // These are reasons for a binding's transition to a Scheduled condition. diff --git a/pkg/controllers/binding/binding_controller.go b/pkg/controllers/binding/binding_controller.go index 52236646fbcc..7c9ecb0ce63f 100644 --- a/pkg/controllers/binding/binding_controller.go +++ b/pkg/controllers/binding/binding_controller.go @@ -89,6 +89,11 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle return c.removeFinalizer(ctx, binding) } + if err := updateBindingDispatchingConditionIfNeeded(ctx, c.Client, c.EventRecorder, binding, apiextensionsv1.NamespaceScoped); err != nil { + klog.ErrorS(err, "Failed to update binding condition.", "name", klog.KObj(binding), "type", workv1alpha2.SchedulingSuspended) + return controllerruntime.Result{}, err + } + return c.syncBinding(ctx, binding) } diff --git a/pkg/controllers/binding/binding_controller_test.go b/pkg/controllers/binding/binding_controller_test.go index cef6aeefce83..a279fc6fdb4c 100644 --- a/pkg/controllers/binding/binding_controller_test.go +++ b/pkg/controllers/binding/binding_controller_test.go @@ -22,15 +22,20 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" fakedynamic "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -39,6 +44,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/events" testing2 "github.com/karmada-io/karmada/pkg/search/proxy/testing" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" @@ -445,3 +451,119 @@ func TestResourceBindingController_removeFinalizer(t *testing.T) { }) } } + +func TestUpdateBindingDispatchingConditionIfNeeded(t *testing.T) { + tests := []struct { + name string + binding *workv1alpha2.ResourceBinding + expectedCondition metav1.Condition + expectedEventCount int + expectEventMessage string + }{ + { + name: "Binding scheduling is suspended", + binding: newRb(true, metav1.Condition{}), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + }, + expectedEventCount: 1, + expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SuspendedSchedulingConditionMessage), + }, + { + name: "Binding scheduling is not suspended", + binding: newRb(false, metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + Reason: SuspendedSchedulingConditionReason, + Message: SuspendedSchedulingConditionMessage, + }), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionFalse, + }, + expectedEventCount: 1, + expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SchedulingConditionMessage), + }, + { + name: "Condition already matches, no update needed", + binding: newRb(true, metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + Reason: SuspendedSchedulingConditionReason, + Message: SuspendedSchedulingConditionMessage, + }), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + }, + }, + { + name: "No SchedulingSuspended condition and scheduling is not suspended, no update needed", + binding: newRb(false, metav1.Condition{ + Type: workv1alpha2.BindingReasonUnschedulable, + Status: metav1.ConditionTrue, + }), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.BindingReasonUnschedulable, + Status: metav1.ConditionTrue, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + eventRecorder := record.NewFakeRecorder(1) + c := newResourceBindingController(tt.binding, eventRecorder) + + updatedBinding := &workv1alpha2.ResourceBinding{} + assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding)) + + err := updateBindingDispatchingConditionIfNeeded(context.Background(), c.Client, c.EventRecorder, tt.binding, apiextensionsv1.NamespaceScoped) + if err != nil { + t.Errorf("updateBindingDispatchingConditionIfNeeded() returned an error: %v", err) + } + + assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding)) + assert.True(t, meta.IsStatusConditionPresentAndEqual(updatedBinding.Status.Conditions, tt.expectedCondition.Type, tt.expectedCondition.Status)) + assert.Equal(t, tt.expectedEventCount, len(eventRecorder.Events)) + if tt.expectEventMessage != "" { + e := <-eventRecorder.Events + assert.Equal(t, tt.expectEventMessage, e) + } + }) + } +} + +func newResourceBindingController(binding *workv1alpha2.ResourceBinding, eventRecord record.EventRecorder) ResourceBindingController { + restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(binding).WithStatusSubresource(binding).WithRESTMapper(restMapper).Build() + return ResourceBindingController{ + Client: fakeClient, + EventRecorder: eventRecord, + } +} + +func newRb(suspended bool, condition metav1.Condition) *workv1alpha2.ResourceBinding { + return &workv1alpha2.ResourceBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: workv1alpha2.ResourceKindResourceBinding, + APIVersion: workv1alpha2.GroupVersion.Version, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rb", + Namespace: "default", + UID: uuid.NewUUID(), + }, + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{ + Scheduling: ptr.To(suspended), + }, + }, + Status: workv1alpha2.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + condition, + }, + }, + } +} diff --git a/pkg/controllers/binding/cluster_resource_binding_controller.go b/pkg/controllers/binding/cluster_resource_binding_controller.go index 89cf6759e0bf..8d95e82d121c 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller.go @@ -89,6 +89,11 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co return c.removeFinalizer(ctx, clusterResourceBinding) } + if err := updateBindingDispatchingConditionIfNeeded(ctx, c.Client, c.EventRecorder, clusterResourceBinding, apiextensionsv1.ClusterScoped); err != nil { + klog.ErrorS(err, "Failed to update binding condition.", "name", klog.KObj(clusterResourceBinding), "type", workv1alpha2.SchedulingSuspended) + return controllerruntime.Result{}, err + } + return c.syncBinding(ctx, clusterResourceBinding) } diff --git a/pkg/controllers/binding/cluster_resource_binding_controller_test.go b/pkg/controllers/binding/cluster_resource_binding_controller_test.go index 4abbbe1c8563..3eb7adbd17f1 100644 --- a/pkg/controllers/binding/cluster_resource_binding_controller_test.go +++ b/pkg/controllers/binding/cluster_resource_binding_controller_test.go @@ -22,15 +22,20 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" fakedynamic "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -39,6 +44,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/events" testing2 "github.com/karmada-io/karmada/pkg/search/proxy/testing" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" @@ -439,3 +445,119 @@ func TestClusterResourceBindingController_newOverridePolicyFunc(t *testing.T) { }) } } + +func TestUpdateClusterBindingDispatchingConditionIfNeeded(t *testing.T) { + tests := []struct { + name string + binding *workv1alpha2.ClusterResourceBinding + expectedCondition metav1.Condition + expectedEventCount int + expectEventMessage string + }{ + { + name: "Binding scheduling is suspended", + binding: newCrb(true, metav1.Condition{}), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + }, + expectedEventCount: 1, + expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SuspendedSchedulingConditionMessage), + }, + { + name: "Binding scheduling is not suspended", + binding: newCrb(false, metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + Reason: SuspendedSchedulingConditionReason, + Message: SuspendedSchedulingConditionMessage, + }), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionFalse, + }, + expectedEventCount: 1, + expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SchedulingConditionMessage), + }, + { + name: "Condition already matches, no update needed", + binding: newCrb(true, metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + Reason: SuspendedSchedulingConditionReason, + Message: SuspendedSchedulingConditionMessage, + }), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + Status: metav1.ConditionTrue, + }, + }, + { + name: "No SchedulingSuspended condition and scheduling is not suspended, no update needed", + binding: newCrb(false, metav1.Condition{ + Type: workv1alpha2.BindingReasonUnschedulable, + Status: metav1.ConditionTrue, + }), + expectedCondition: metav1.Condition{ + Type: workv1alpha2.BindingReasonUnschedulable, + Status: metav1.ConditionTrue, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + eventRecorder := record.NewFakeRecorder(1) + c := newClusterResourceBindingController(tt.binding, eventRecorder) + + updatedBinding := &workv1alpha2.ClusterResourceBinding{} + assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding)) + + err := updateBindingDispatchingConditionIfNeeded(context.Background(), c.Client, c.EventRecorder, tt.binding, apiextensionsv1.ClusterScoped) + if err != nil { + t.Errorf("updateBindingDispatchingConditionIfNeeded() returned an error: %v", err) + } + + assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding)) + assert.True(t, meta.IsStatusConditionPresentAndEqual(updatedBinding.Status.Conditions, tt.expectedCondition.Type, tt.expectedCondition.Status)) + assert.Equal(t, tt.expectedEventCount, len(eventRecorder.Events)) + if tt.expectEventMessage != "" { + e := <-eventRecorder.Events + assert.Equal(t, tt.expectEventMessage, e) + } + }) + } +} + +func newClusterResourceBindingController(binding *workv1alpha2.ClusterResourceBinding, eventRecord record.EventRecorder) ClusterResourceBindingController { + restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion}) + fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(binding).WithStatusSubresource(binding).WithRESTMapper(restMapper).Build() + return ClusterResourceBindingController{ + Client: fakeClient, + EventRecorder: eventRecord, + } +} + +func newCrb(suspended bool, condition metav1.Condition) *workv1alpha2.ClusterResourceBinding { + return &workv1alpha2.ClusterResourceBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: workv1alpha2.ResourceKindResourceBinding, + APIVersion: workv1alpha2.GroupVersion.Version, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-rb", + Namespace: "default", + UID: uuid.NewUUID(), + }, + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{ + Scheduling: ptr.To(suspended), + }, + }, + Status: workv1alpha2.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + condition, + }, + }, + } +} diff --git a/pkg/controllers/binding/common.go b/pkg/controllers/binding/common.go index 2280c840c4dd..318ee884f2fa 100644 --- a/pkg/controllers/binding/common.go +++ b/pkg/controllers/binding/common.go @@ -20,10 +20,13 @@ import ( "context" "strconv" + corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,6 +34,7 @@ import ( configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/util" @@ -39,6 +43,18 @@ import ( "github.com/karmada-io/karmada/pkg/util/overridemanager" ) +const ( + // SuspendedSchedulingConditionMessage is the condition and event message when scheduling is suspended. + SuspendedSchedulingConditionMessage = "Binding scheduling is suspended." + // SchedulingConditionMessage is the condition and event message when scheduling is not suspended. + SchedulingConditionMessage = "Binding scheduling resumed." + + // SuspendedSchedulingConditionReason is the reason for the Suspending condition when scheduling is suspended. + SuspendedSchedulingConditionReason = "SchedulingSuspended" + // ResumedConditionReason is the reason for the Suspending condition when scheduling is not suspended. + ResumedConditionReason = "SchedulingResumed" +) + // ensureWork ensure Work to be created or updated. func ensureWork( ctx context.Context, c client.Client, resourceInterpreter resourceinterpreter.ResourceInterpreter, workload *unstructured.Unstructured, @@ -319,3 +335,72 @@ func shouldSuspendDispatching(suspension *workv1alpha2.Suspension, targetCluster } return suspendDispatching } + +func updateBindingDispatchingConditionIfNeeded(ctx context.Context, client client.Client, eventRecorder record.EventRecorder, binding client.Object, scope apiextensionsv1.ResourceScope) error { + newBindingSchedulingCondition := metav1.Condition{ + Type: workv1alpha2.SchedulingSuspended, + LastTransitionTime: metav1.Now(), + } + + bindingSuspendScheduling := false + var conditions *[]metav1.Condition + switch scope { + case apiextensionsv1.NamespaceScoped: + bindingObj := binding.(*workv1alpha2.ResourceBinding) + bindingSuspendScheduling = util.IsBindingSuspendScheduling(bindingObj) + conditions = &bindingObj.Status.Conditions + case apiextensionsv1.ClusterScoped: + bindingObj := binding.(*workv1alpha2.ClusterResourceBinding) + bindingSuspendScheduling = util.IsClusterBindingSuspendScheduling(bindingObj) + conditions = &bindingObj.Status.Conditions + default: + klog.ErrorS(nil, "Unsupported resource binding scope") + return nil + } + + if bindingSuspendScheduling { + newBindingSchedulingCondition.Status = metav1.ConditionTrue + newBindingSchedulingCondition.Message = SuspendedSchedulingConditionMessage + newBindingSchedulingCondition.Reason = SuspendedSchedulingConditionReason + } else { + // There is no need to set an extra SchedulingSuspended condition when scheduling not suspended and SchedulingSuspended condition not exists. + if meta.FindStatusCondition(*conditions, workv1alpha2.SchedulingSuspended) == nil { + return nil + } + newBindingSchedulingCondition.Status = metav1.ConditionFalse + newBindingSchedulingCondition.Message = SchedulingConditionMessage + newBindingSchedulingCondition.Reason = ResumedConditionReason + } + + if meta.IsStatusConditionPresentAndEqual(*conditions, newBindingSchedulingCondition.Type, newBindingSchedulingCondition.Status) { + return nil + } + if err := updateStatusCondition(ctx, client, binding, conditions, newBindingSchedulingCondition); err != nil { + return err + } + + obj, err := helper.ToUnstructured(binding) + if err != nil { + return err + } + + eventf(eventRecorder, obj, corev1.EventTypeNormal, events.EventReasonBindingScheduling, newBindingSchedulingCondition.Message) + return nil +} + +func updateStatusCondition(ctx context.Context, client client.Client, binding client.Object, condition *[]metav1.Condition, newCondition metav1.Condition) error { + _, err := helper.UpdateStatus(ctx, client, binding, func() error { + meta.SetStatusCondition(condition, newCondition) + return nil + }) + return err +} + +func eventf(eventRecorder record.EventRecorder, object *unstructured.Unstructured, eventType, reason, messageFmt string, args ...interface{}) { + ref, err := helper.GenEventRef(object) + if err != nil { + klog.Errorf("Ignore event(%s) as failed to build event reference for: kind=%s, %s due to %v", reason, object.GetKind(), klog.KObj(object), err) + return + } + eventRecorder.Eventf(ref, eventType, reason, messageFmt, args...) +} diff --git a/pkg/events/events.go b/pkg/events/events.go index 8e3e945b2247..21b830847b0d 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -66,6 +66,8 @@ const ( EventReasonSyncScheduleResultToDependenciesSucceed = "SyncScheduleResultToDependenciesSucceed" // EventReasonSyncScheduleResultToDependenciesFailed indicates sync schedule result to attached bindings failed. EventReasonSyncScheduleResultToDependenciesFailed = "SyncScheduleResultToDependenciesFailed" + // EventReasonBindingScheduling indicates that binding is scheduling or not. + EventReasonBindingScheduling = "BindingScheduling" ) // Define events for ResourceBinding, ClusterResourceBinding objects and their associated resources. diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index c400eae5eee4..a37af1f029be 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -101,10 +101,16 @@ func (s *Scheduler) resourceBindingEventFilter(obj interface{}) bool { if !schedulerNameFilter(s.schedulerName, t.Spec.SchedulerName) { return false } + if util.IsBindingSuspendScheduling(t) { + return false + } case *workv1alpha2.ClusterResourceBinding: if !schedulerNameFilter(s.schedulerName, t.Spec.SchedulerName) { return false } + if util.IsClusterBindingSuspendScheduling(t) { + return false + } } return util.GetLabelValue(accessor.GetLabels(), policyv1alpha1.PropagationPolicyPermanentIDLabel) != "" || diff --git a/pkg/scheduler/event_handler_test.go b/pkg/scheduler/event_handler_test.go index fbf7088aeb7c..92f844c2909e 100644 --- a/pkg/scheduler/event_handler_test.go +++ b/pkg/scheduler/event_handler_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" @@ -40,13 +41,13 @@ func TestResourceBindingEventFilter(t *testing.T) { { name: "ResourceBinding: Matching scheduler name, no labels", schedulerName: "test-scheduler", - obj: createResourceBinding("test-rb", "test-scheduler", nil), + obj: createResourceBinding("test-rb", "test-scheduler", nil, nil), expectedResult: false, }, { name: "ResourceBinding: Non-matching scheduler name", schedulerName: "test-scheduler", - obj: createResourceBinding("test-rb", "other-scheduler", nil), + obj: createResourceBinding("test-rb", "other-scheduler", nil, nil), expectedResult: false, }, { @@ -54,7 +55,7 @@ func TestResourceBindingEventFilter(t *testing.T) { schedulerName: "test-scheduler", obj: createResourceBinding("test-rb", "test-scheduler", map[string]string{ policyv1alpha1.PropagationPolicyPermanentIDLabel: "test-id", - }), + }, nil), expectedResult: true, }, { @@ -62,7 +63,7 @@ func TestResourceBindingEventFilter(t *testing.T) { schedulerName: "test-scheduler", obj: createResourceBinding("test-rb", "test-scheduler", map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "test-id", - }), + }, nil), expectedResult: true, }, { @@ -70,7 +71,7 @@ func TestResourceBindingEventFilter(t *testing.T) { schedulerName: "test-scheduler", obj: createResourceBinding("test-rb", "test-scheduler", map[string]string{ workv1alpha2.BindingManagedByLabel: "test-manager", - }), + }, nil), expectedResult: true, }, { @@ -78,19 +79,19 @@ func TestResourceBindingEventFilter(t *testing.T) { schedulerName: "test-scheduler", obj: createResourceBinding("test-rb", "test-scheduler", map[string]string{ policyv1alpha1.PropagationPolicyPermanentIDLabel: "", - }), + }, nil), expectedResult: false, }, { name: "ClusterResourceBinding: Matching scheduler name, no labels", schedulerName: "test-scheduler", - obj: createClusterResourceBinding("test-crb", "test-scheduler", nil), + obj: createClusterResourceBinding("test-crb", "test-scheduler", nil, nil), expectedResult: false, }, { name: "ClusterResourceBinding: Non-matching scheduler name", schedulerName: "test-scheduler", - obj: createClusterResourceBinding("test-crb", "other-scheduler", nil), + obj: createClusterResourceBinding("test-crb", "other-scheduler", nil, nil), expectedResult: false, }, { @@ -98,7 +99,7 @@ func TestResourceBindingEventFilter(t *testing.T) { schedulerName: "test-scheduler", obj: createClusterResourceBinding("test-crb", "test-scheduler", map[string]string{ policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "test-id", - }), + }, nil), expectedResult: true, }, { @@ -113,6 +114,22 @@ func TestResourceBindingEventFilter(t *testing.T) { obj: "not-a-valid-object", expectedResult: false, }, + { + name: "ResourceBinding suspended", + schedulerName: "test-scheduler", + obj: createResourceBinding("test-rb", "test-scheduler", map[string]string{ + workv1alpha2.BindingManagedByLabel: "test-manager", + }, &workv1alpha2.Suspension{Scheduling: ptr.To(true)}), + expectedResult: false, + }, + { + name: "ClusterResourceBinding suspended", + schedulerName: "test-scheduler", + obj: createClusterResourceBinding("test-crb", "test-scheduler", map[string]string{ + policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: "test-id", + }, &workv1alpha2.Suspension{Scheduling: ptr.To(true)}), + expectedResult: false, + }, } for _, tc := range testCases { @@ -404,7 +421,7 @@ func createCluster(name string, generation int64, labels map[string]string) *clu } } -func createResourceBinding(name, schedulerName string, labels map[string]string) *workv1alpha2.ResourceBinding { +func createResourceBinding(name, schedulerName string, labels map[string]string, suspension *workv1alpha2.Suspension) *workv1alpha2.ResourceBinding { return &workv1alpha2.ResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -412,11 +429,12 @@ func createResourceBinding(name, schedulerName string, labels map[string]string) }, Spec: workv1alpha2.ResourceBindingSpec{ SchedulerName: schedulerName, + Suspension: suspension, }, } } -func createClusterResourceBinding(name, schedulerName string, labels map[string]string) *workv1alpha2.ClusterResourceBinding { +func createClusterResourceBinding(name, schedulerName string, labels map[string]string, suspension *workv1alpha2.Suspension) *workv1alpha2.ClusterResourceBinding { return &workv1alpha2.ClusterResourceBinding{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -424,6 +442,7 @@ func createClusterResourceBinding(name, schedulerName string, labels map[string] }, Spec: workv1alpha2.ResourceBindingSpec{ SchedulerName: schedulerName, + Suspension: suspension, }, } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index daaf7135bfc7..cdc5d1074364 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -357,7 +357,6 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) { klog.V(4).InfoS("Skip schedule deleting ResourceBinding", "ResourceBinding", klog.KObj(rb)) return nil } - rb = rb.DeepCopy() if rb.Spec.Placement == nil { @@ -427,7 +426,6 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) { klog.V(4).InfoS("Skip schedule deleting ClusterResourceBinding", "ClusterResourceBinding", klog.KObj(crb)) return nil } - crb = crb.DeepCopy() if crb.Spec.Placement == nil { @@ -849,7 +847,7 @@ func patchBindingStatusWithAffinityName(karmadaClient karmadaclientset.Interface } func patchBindingStatus(karmadaClient karmadaclientset.Interface, rb, updateRB *workv1alpha2.ResourceBinding) error { - patchBytes, err := helper.GenFieldMergePatch("status", rb.Status, updateRB.Status) + patchBytes, err := helper.GenJSONPatch(workv1alpha2.ResourceBinding{Status: rb.Status}, workv1alpha2.ResourceBinding{Status: updateRB.Status}) if err != nil { return err } @@ -857,7 +855,7 @@ func patchBindingStatus(karmadaClient karmadaclientset.Interface, rb, updateRB * return nil } - _, err = karmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = karmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status") if err != nil { klog.Errorf("Failed to patch schedule status ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) return err @@ -900,7 +898,7 @@ func patchClusterBindingStatusWithAffinityName(karmadaClient karmadaclientset.In } func patchClusterResourceBindingStatus(karmadaClient karmadaclientset.Interface, crb, updateCRB *workv1alpha2.ClusterResourceBinding) error { - patchBytes, err := helper.GenFieldMergePatch("status", crb.Status, updateCRB.Status) + patchBytes, err := helper.GenJSONPatch(workv1alpha2.ResourceBinding{Status: crb.Status}, workv1alpha2.ResourceBinding{Status: updateCRB.Status}) if err != nil { return err } @@ -908,7 +906,7 @@ func patchClusterResourceBindingStatus(karmadaClient karmadaclientset.Interface, return nil } - _, err = karmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = karmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status") if err != nil { klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err) return err diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 59ca28f59474..549899da4339 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -566,7 +566,7 @@ func TestScheduleResourceBindingWithClusterAffinities(t *testing.T) { expectError: false, expectedPatches: []string{ `{"metadata":{"annotations":{"policy.karmada.io/applied-placement":"{\"clusterAffinities\":[{\"affinityName\":\"affinity1\",\"clusterNames\":[\"cluster1\"]},{\"affinityName\":\"affinity2\",\"clusterNames\":[\"cluster2\"]}]}"}},"spec":{"clusters":[{"name":"cluster1","replicas":1}]}}`, - `{"status":{"schedulerObservingAffinityName":"affinity1"}}`, + `[{"op":"add", "path":"/status/schedulerObservingAffinityName", "value":"affinity1"}]`, }, expectedEvent: "Normal ScheduleBindingSucceed Binding has been scheduled successfully.", }, @@ -608,7 +608,7 @@ func TestScheduleResourceBindingWithClusterAffinities(t *testing.T) { expectError: false, expectedPatches: []string{ `{"metadata":{"annotations":{"policy.karmada.io/applied-placement":"{\"clusterAffinities\":[{\"affinityName\":\"affinity1\",\"clusterNames\":[\"cluster1\"]},{\"affinityName\":\"affinity2\",\"clusterNames\":[\"cluster2\"]}]}"}},"spec":{"clusters":[{"name":"cluster2","replicas":1}]}}`, - `{"status":{"schedulerObservingAffinityName":"affinity2"}}`, + `[{"op":"add","path":"/status/schedulerObservingAffinityName","value":"affinity2"}]`, }, expectedEvent: "Warning ScheduleBindingFailed failed to schedule ResourceBinding(default/test-binding-2) with clusterAffiliates index(0): first affinity failed", }, diff --git a/pkg/util/binding.go b/pkg/util/binding.go index 6811984f0ec8..24f804ab1831 100644 --- a/pkg/util/binding.go +++ b/pkg/util/binding.go @@ -110,3 +110,19 @@ func RescheduleRequired(rescheduleTriggeredAt, lastScheduledTime *metav1.Time) b } return rescheduleTriggeredAt.After(lastScheduledTime.Time) } + +// IsBindingSuspendScheduling return whether resource binding is scheduling suspended. +func IsBindingSuspendScheduling(rb *workv1alpha2.ResourceBinding) bool { + if rb == nil || rb.Spec.Suspension == nil || rb.Spec.Suspension.Scheduling == nil { + return false + } + return *rb.Spec.Suspension.Scheduling +} + +// IsClusterBindingSuspendScheduling return whether cluster resource binding is scheduling suspended. +func IsClusterBindingSuspendScheduling(crb *workv1alpha2.ClusterResourceBinding) bool { + if crb == nil || crb.Spec.Suspension == nil || crb.Spec.Suspension.Scheduling == nil { + return false + } + return *crb.Spec.Suspension.Scheduling +} diff --git a/pkg/util/binding_test.go b/pkg/util/binding_test.go index 79355978ba98..48ac8790d48c 100644 --- a/pkg/util/binding_test.go +++ b/pkg/util/binding_test.go @@ -21,8 +21,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/ptr" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -419,3 +421,115 @@ func TestRescheduleRequired(t *testing.T) { }) } } + +func TestIsBindingSuspendScheduling(t *testing.T) { + tests := []struct { + name string + rb *workv1alpha2.ResourceBinding + expected bool + }{ + { + name: "rb is nil", + rb: nil, + expected: false, + }, + { + name: "rb.Spec.Suspension is nil", + rb: &workv1alpha2.ResourceBinding{}, + expected: false, + }, + { + name: "rb.Spec.Suspension.Scheduling is nil", + rb: &workv1alpha2.ResourceBinding{ + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{}, + }, + }, + expected: false, + }, + { + name: "rb.Spec.Suspension.Scheduling is false", + rb: &workv1alpha2.ResourceBinding{ + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{ + Scheduling: ptr.To(false), + }, + }, + }, + expected: false, + }, + { + name: "rb.Spec.Suspension.Scheduling is true", + rb: &workv1alpha2.ResourceBinding{ + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{ + Scheduling: ptr.To(true), + }, + }, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, IsBindingSuspendScheduling(tt.rb)) + }) + } +} + +func TestIsClusterBindingSuspendScheduling(t *testing.T) { + tests := []struct { + name string + crb *workv1alpha2.ClusterResourceBinding + expected bool + }{ + { + name: "crb is nil", + crb: nil, + expected: false, + }, + { + name: "crb.Spec.Suspension is nil", + crb: &workv1alpha2.ClusterResourceBinding{}, + expected: false, + }, + { + name: "crb.Spec.Suspension.Scheduling is nil", + crb: &workv1alpha2.ClusterResourceBinding{ + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{}, + }, + }, + expected: false, + }, + { + name: "crb.Spec.Suspension.Scheduling is false", + crb: &workv1alpha2.ClusterResourceBinding{ + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{ + Scheduling: ptr.To(false), + }, + }, + }, + expected: false, + }, + { + name: "crb.Spec.Suspension.Scheduling is true", + crb: &workv1alpha2.ClusterResourceBinding{ + Spec: workv1alpha2.ResourceBindingSpec{ + Suspension: &workv1alpha2.Suspension{ + Scheduling: ptr.To(true), + }, + }, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, IsClusterBindingSuspendScheduling(tt.crb)) + }) + } +} diff --git a/pkg/util/helper/patch.go b/pkg/util/helper/patch.go index 89f9f798376a..d4e7d4c24388 100644 --- a/pkg/util/helper/patch.go +++ b/pkg/util/helper/patch.go @@ -21,6 +21,7 @@ import ( "fmt" jsonpatch "github.com/evanphx/json-patch/v5" + jsonpatchv2 "gomodules.xyz/jsonpatch/v2" ) // GenMergePatch will return a merge patch document capable of converting the @@ -66,3 +67,27 @@ func GenFieldMergePatch(fieldName string, originField interface{}, modifiedField patchBytes = append([]byte(`{"`+fieldName+`":`), patchBytes...) return patchBytes, nil } + +// GenJSONPatch returns a rfc6902 format json patch. +// The merge patch format is primarily intended for use with the HTTP PATCH method +// as a means of describing a set of modifications to a target resource's content. +func GenJSONPatch(old, new interface{}) ([]byte, error) { + oldJSON, err := json.Marshal(old) + if err != nil { + return nil, fmt.Errorf("failed to marshal old object: %v", err) + } + newJSON, err := json.Marshal(new) + if err != nil { + return nil, fmt.Errorf("failed to marshal new object: %v", err) + } + patchBytes, err := jsonpatchv2.CreatePatch(oldJSON, newJSON) + if err != nil { + return nil, fmt.Errorf("failed to generate patch: %v", err) + } + + patchJSON, err := json.Marshal(patchBytes) + if err != nil { + return nil, fmt.Errorf("failed to marshal patch: %v", err) + } + return patchJSON, nil +}