diff --git a/artifacts/deploy/karmada-controller-manager.yaml b/artifacts/deploy/karmada-controller-manager.yaml index 8beb681208a7..7a717080e35b 100644 --- a/artifacts/deploy/karmada-controller-manager.yaml +++ b/artifacts/deploy/karmada-controller-manager.yaml @@ -30,7 +30,7 @@ spec: - --cluster-status-update-frequency=10s - --secure-port=10357 - --failover-eviction-timeout=30s - - --controllers=*,hpaReplicasSyncer + - --controllers=*,hpaAutoLabelRetain,deploymentReplicasSyncer - --feature-gates=PropagationPolicyPreemption=true,MultiClusterService=true - --v=4 livenessProbe: diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 75d6089b5edf..93597ff90e72 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -31,7 +31,6 @@ import ( "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/scale" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/term" "k8s.io/klog/v2" @@ -62,7 +61,7 @@ import ( metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics" "github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota" "github.com/karmada-io/karmada/pkg/controllers/gracefuleviction" - "github.com/karmada-io/karmada/pkg/controllers/hpareplicassyncer" + "github.com/karmada-io/karmada/pkg/controllers/hpaautolabelretain" "github.com/karmada-io/karmada/pkg/controllers/mcs" "github.com/karmada-io/karmada/pkg/controllers/multiclusterservice" "github.com/karmada-io/karmada/pkg/controllers/namespace" @@ -70,6 +69,7 @@ import ( "github.com/karmada-io/karmada/pkg/controllers/status" "github.com/karmada-io/karmada/pkg/controllers/unifiedauth" "github.com/karmada-io/karmada/pkg/dependenciesdistributor" + "github.com/karmada-io/karmada/pkg/deploymentreplicassyncer" "github.com/karmada-io/karmada/pkg/detector" "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient" @@ -206,7 +206,7 @@ func Run(ctx context.Context, opts *options.Options) error { var controllers = make(controllerscontext.Initializers) // controllersDisabledByDefault is the set of controllers which is disabled by default -var controllersDisabledByDefault = sets.New("hpaReplicasSyncer") +var controllersDisabledByDefault = sets.New("hpaAutoLabelRetain", "deploymentReplicasSyncer") func init() { controllers["cluster"] = startClusterController @@ -226,7 +226,8 @@ func init() { controllers["applicationFailover"] = startApplicationFailoverController controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController - controllers["hpaReplicasSyncer"] = startHPAReplicasSyncerController + controllers["hpaAutoLabelRetain"] = startHPAReplicasSyncerController + controllers["deploymentReplicasSyncer"] = startDeploymentReplicasSyncerController controllers["multiclusterservice"] = startMCSController controllers["endpointsliceCollect"] = startEndpointSliceCollectController controllers["endpointsliceDispatch"] = startEndpointSliceDispatchController @@ -656,19 +657,23 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext. } func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) { - scaleKindResolver := scale.NewDiscoveryScaleKindResolver(ctx.KubeClientSet.Discovery()) - scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + hpaAutoLabelRetain := hpaautolabelretain.HPAAutoLabelRetain{ + DynamicClient: ctx.DynamicClientSet, + RESTMapper: ctx.Mgr.GetRESTMapper(), + } + err = hpaAutoLabelRetain.SetupWithManager(ctx.Mgr) if err != nil { return false, err } - hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{ - Client: ctx.Mgr.GetClient(), - DynamicClient: ctx.DynamicClientSet, - RESTMapper: ctx.Mgr.GetRESTMapper(), - ScaleClient: scaleClient, + return true, nil +} + +func startDeploymentReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) { + deploymentReplicasSyncer := deploymentreplicassyncer.DeploymentReplicasSyncer{ + Client: ctx.Mgr.GetClient(), } - err = hpaReplicasSyncer.SetupWithManager(ctx.Mgr) + err = deploymentReplicasSyncer.SetupWithManager(ctx.Mgr) if err != nil { return false, err } diff --git a/pkg/controllers/hpaautolabelretain/hpa_auto_label_retain_controller.go b/pkg/controllers/hpaautolabelretain/hpa_auto_label_retain_controller.go new file mode 100644 index 000000000000..3b15ceb1b1c7 --- /dev/null +++ b/pkg/controllers/hpaautolabelretain/hpa_auto_label_retain_controller.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hpaautolabelretain + +import ( + "context" + + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/dynamic" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + + "github.com/karmada-io/karmada/pkg/util" +) + +const ( + // ControllerName is the controller name that will be used when reporting events. + ControllerName = "hpa-auto-label-retain" + // scaleRefWorkerNum is the async Worker number + scaleRefWorkerNum = 1 +) + +// HPAAutoLabelRetain is to automatically add `retain-replicas` label to resource template mananged by HPA. +type HPAAutoLabelRetain struct { + DynamicClient dynamic.Interface + RESTMapper meta.RESTMapper + + scaleRefWorker util.AsyncWorker +} + +// SetupWithManager creates a controller and register to controller manager. +func (r *HPAAutoLabelRetain) SetupWithManager(mgr controllerruntime.Manager) error { + scaleRefWorkerOptions := util.Options{ + Name: "scale ref worker", + ReconcileFunc: r.reconcileScaleRef, + } + r.scaleRefWorker = util.NewAsyncWorker(scaleRefWorkerOptions) + r.scaleRefWorker.Run(scaleRefWorkerNum, context.Background().Done()) + + return controllerruntime.NewControllerManagedBy(mgr). + Named(ControllerName). + For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(r)). + Complete(r) +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The Controller will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (r *HPAAutoLabelRetain) Reconcile(_ context.Context, _ controllerruntime.Request) (controllerruntime.Result, error) { + return controllerruntime.Result{}, nil +} diff --git a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go b/pkg/controllers/hpaautolabelretain/hpa_auto_label_retain_predicate.go similarity index 89% rename from pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go rename to pkg/controllers/hpaautolabelretain/hpa_auto_label_retain_predicate.go index b93a29b341d8..2c238e1343a5 100644 --- a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_predicate.go +++ b/pkg/controllers/hpaautolabelretain/hpa_auto_label_retain_predicate.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package hpareplicassyncer +package hpaautolabelretain import ( autoscalingv2 "k8s.io/api/autoscaling/v2" @@ -25,10 +25,10 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" ) -var _ predicate.Predicate = &HPAReplicasSyncer{} +var _ predicate.Predicate = &HPAAutoLabelRetain{} // Create implements CreateEvent filter -func (r *HPAReplicasSyncer) Create(e event.CreateEvent) bool { +func (r *HPAAutoLabelRetain) Create(e event.CreateEvent) bool { hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler) if !ok { klog.Errorf("create predicates in hpa controller called, but obj is not hpa type") @@ -44,7 +44,7 @@ func (r *HPAReplicasSyncer) Create(e event.CreateEvent) bool { } // Update implements UpdateEvent filter -func (r *HPAReplicasSyncer) Update(e event.UpdateEvent) bool { +func (r *HPAAutoLabelRetain) Update(e event.UpdateEvent) bool { oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler) if !ok { klog.Errorf("update predicates in hpa controller called, but old obj is not hpa type") @@ -72,7 +72,7 @@ func (r *HPAReplicasSyncer) Update(e event.UpdateEvent) bool { } // Delete implements DeleteEvent filter -func (r *HPAReplicasSyncer) Delete(e event.DeleteEvent) bool { +func (r *HPAAutoLabelRetain) Delete(e event.DeleteEvent) bool { hpa, ok := e.Object.(*autoscalingv2.HorizontalPodAutoscaler) if !ok { klog.Errorf("delete predicates in hpa controller called, but obj is not hpa type") @@ -86,7 +86,7 @@ func (r *HPAReplicasSyncer) Delete(e event.DeleteEvent) bool { } // Generic implements default GenericEvent filter -func (r *HPAReplicasSyncer) Generic(_ event.GenericEvent) bool { +func (r *HPAAutoLabelRetain) Generic(_ event.GenericEvent) bool { return false } diff --git a/pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go b/pkg/controllers/hpaautolabelretain/hpa_scale_ref_worker.go similarity index 94% rename from pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go rename to pkg/controllers/hpaautolabelretain/hpa_scale_ref_worker.go index 67d99f05b21f..748d3d5ecad9 100644 --- a/pkg/controllers/hpareplicassyncer/hpa_scale_ref_worker.go +++ b/pkg/controllers/hpaautolabelretain/hpa_scale_ref_worker.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package hpareplicassyncer +package hpaautolabelretain import ( "context" @@ -45,7 +45,7 @@ type labelEvent struct { hpa *autoscalingv2.HorizontalPodAutoscaler } -func (r *HPAReplicasSyncer) reconcileScaleRef(key util.QueueKey) (err error) { +func (r *HPAAutoLabelRetain) reconcileScaleRef(key util.QueueKey) (err error) { event, ok := key.(labelEvent) if !ok { klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key) @@ -68,7 +68,7 @@ func (r *HPAReplicasSyncer) reconcileScaleRef(key util.QueueKey) (err error) { return err } -func (r *HPAReplicasSyncer) addHPALabelToScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error { +func (r *HPAAutoLabelRetain) addHPALabelToScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error { targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version) if err != nil { @@ -110,7 +110,7 @@ func (r *HPAReplicasSyncer) addHPALabelToScaleRef(ctx context.Context, hpa *auto return nil } -func (r *HPAReplicasSyncer) deleteHPALabelFromScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error { +func (r *HPAAutoLabelRetain) deleteHPALabelFromScaleRef(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error { targetGVK := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) mapping, err := r.RESTMapper.RESTMapping(targetGVK.GroupKind(), targetGVK.Version) if err != nil { diff --git a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go b/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go deleted file mode 100644 index 3bb74576adc8..000000000000 --- a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go +++ /dev/null @@ -1,161 +0,0 @@ -/* -Copyright 2023 The Karmada Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hpareplicassyncer - -import ( - "context" - - autoscalingv1 "k8s.io/api/autoscaling/v1" - autoscalingv2 "k8s.io/api/autoscaling/v2" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/scale" - "k8s.io/klog/v2" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/karmada-io/karmada/pkg/util" -) - -const ( - // ControllerName is the controller name that will be used when reporting events. - ControllerName = "hpa-replicas-syncer" - // scaleRefWorkerNum is the async Worker number - scaleRefWorkerNum = 1 -) - -// HPAReplicasSyncer is to sync replicas from status of HPA to resource template. -type HPAReplicasSyncer struct { - Client client.Client - DynamicClient dynamic.Interface - RESTMapper meta.RESTMapper - - ScaleClient scale.ScalesGetter - scaleRefWorker util.AsyncWorker -} - -// SetupWithManager creates a controller and register to controller manager. -func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error { - scaleRefWorkerOptions := util.Options{ - Name: "scale ref worker", - ReconcileFunc: r.reconcileScaleRef, - } - r.scaleRefWorker = util.NewAsyncWorker(scaleRefWorkerOptions) - r.scaleRefWorker.Run(scaleRefWorkerNum, context.Background().Done()) - - return controllerruntime.NewControllerManagedBy(mgr). - Named(ControllerName). - For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(r)). - Complete(r) -} - -// Reconcile performs a full reconciliation for the object referred to by the Request. -// The Controller will requeue the Request to be processed again if an error is non-nil or -// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. -func (r *HPAReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { - klog.V(4).Infof("Reconciling for HPA %s/%s", req.Namespace, req.Name) - - hpa := &autoscalingv2.HorizontalPodAutoscaler{} - err := r.Client.Get(ctx, req.NamespacedName, hpa) - if err != nil { - if apierrors.IsNotFound(err) { - return controllerruntime.Result{}, nil - } - - return controllerruntime.Result{}, err - } - - workloadGR, scale, err := r.getGroupResourceAndScaleForWorkloadFromHPA(ctx, hpa) - if err != nil { - return controllerruntime.Result{}, err - } - - err = r.updateScaleIfNeed(ctx, workloadGR, scale.DeepCopy(), hpa) - if err != nil { - return controllerruntime.Result{}, err - } - - // TODO(@lxtywypc): Add finalizer for HPA and remove them - // when the HPA is deleting and the replicas have been synced. - - return controllerruntime.Result{}, nil -} - -// getGroupResourceAndScaleForWorkloadFromHPA parses GroupResource and get Scale -// of the workload declared in spec.scaleTargetRef of HPA. -func (r *HPAReplicasSyncer) getGroupResourceAndScaleForWorkloadFromHPA(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, -) (schema.GroupResource, *autoscalingv1.Scale, error) { - gvk := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) - mapping, err := r.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - klog.Errorf("Failed to get group resource for resource(kind=%s, %s/%s): %v", - hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err) - - return schema.GroupResource{}, nil, err - } - - gr := mapping.Resource.GroupResource() - - scale, err := r.ScaleClient.Scales(hpa.Namespace).Get(ctx, gr, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - // If the scale of workload is not found, skip processing. - return gr, nil, nil - } - - klog.Errorf("Failed to get scale for resource(kind=%s, %s/%s): %v", - hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err) - - return schema.GroupResource{}, nil, err - } - - return gr, scale, nil -} - -// updateScaleIfNeed would update the scale of workload on fed-control plane -// if the replicas declared in the workload on karmada-control-plane does not match -// the actual replicas in member clusters effected by HPA. -func (r *HPAReplicasSyncer) updateScaleIfNeed(ctx context.Context, workloadGR schema.GroupResource, scale *autoscalingv1.Scale, hpa *autoscalingv2.HorizontalPodAutoscaler) error { - // If the scale of workload is not found, skip processing. - if scale == nil { - klog.V(4).Infof("Scale of resource(kind=%s, %s/%s) not found, the resource might have been removed, skip", - hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name) - - return nil - } - - if scale.Spec.Replicas != hpa.Status.DesiredReplicas { - oldReplicas := scale.Spec.Replicas - - scale.Spec.Replicas = hpa.Status.DesiredReplicas - _, err := r.ScaleClient.Scales(hpa.Namespace).Update(ctx, workloadGR, scale, metav1.UpdateOptions{}) - if err != nil { - klog.Errorf("Failed to try to sync scale for resource(kind=%s, %s/%s) from %d to %d: %v", - hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.DesiredReplicas, err) - return err - } - - klog.V(4).Infof("Successfully synced scale for resource(kind=%s, %s/%s) from %d to %d", - hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.DesiredReplicas) - } - - return nil -} diff --git a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller_test.go b/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller_test.go deleted file mode 100644 index bc97741a7f6a..000000000000 --- a/pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller_test.go +++ /dev/null @@ -1,341 +0,0 @@ -/* -Copyright 2023 The Karmada Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hpareplicassyncer - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - appsv1 "k8s.io/api/apps/v1" - autoscalingv1 "k8s.io/api/autoscaling/v1" - autoscalingv2 "k8s.io/api/autoscaling/v2" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - scalefake "k8s.io/client-go/scale/fake" - coretesting "k8s.io/client-go/testing" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - workloadv1alpha1 "github.com/karmada-io/karmada/examples/customresourceinterpreter/apis/workload/v1alpha1" - "github.com/karmada-io/karmada/pkg/util/gclient" -) - -func TestGetGroupResourceAndScaleForWorkloadFromHPA(t *testing.T) { - deployment := newDeployment("deployment-1", 1) - workload := newWorkload("workload-1", 1) - syncer := newHPAReplicasSyncer(deployment, workload) - cases := []struct { - name string - hpa *autoscalingv2.HorizontalPodAutoscaler - expectedError bool - expectedScale bool - expectedGR schema.GroupResource - }{ - { - name: "normal case", - hpa: newHPA(appsv1.SchemeGroupVersion.String(), "Deployment", "deployment-1", 0), - expectedError: false, - expectedScale: true, - expectedGR: schema.GroupResource{Group: appsv1.SchemeGroupVersion.Group, Resource: "deployments"}, - }, - { - name: "customized resource case", - hpa: newHPA(workloadv1alpha1.SchemeGroupVersion.String(), "Workload", "workload-1", 0), - expectedError: false, - expectedScale: true, - expectedGR: schema.GroupResource{Group: workloadv1alpha1.SchemeGroupVersion.Group, Resource: "workloads"}, - }, - { - name: "scale not found", - hpa: newHPA(appsv1.SchemeGroupVersion.String(), "Deployment", "deployment-2", 0), - expectedError: false, - expectedScale: false, - expectedGR: schema.GroupResource{Group: appsv1.SchemeGroupVersion.Group, Resource: "deployments"}, - }, - { - name: "resource not registered", - hpa: newHPA("fake/v1", "FakeWorkload", "fake-workload-1", 0), - expectedError: true, - expectedScale: false, - expectedGR: schema.GroupResource{}, - }, - } - - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - gr, scale, err := syncer.getGroupResourceAndScaleForWorkloadFromHPA(context.TODO(), tt.hpa) - - if tt.expectedError { - assert.NotEmpty(t, err) - return - } - assert.Empty(t, err) - - if tt.expectedScale { - assert.NotEmpty(t, scale) - } else { - assert.Empty(t, scale) - } - - assert.Equal(t, tt.expectedGR, gr) - }) - } -} - -func TestUpdateScaleIfNeed(t *testing.T) { - cases := []struct { - name string - object client.Object - gr schema.GroupResource - scale *autoscalingv1.Scale - hpa *autoscalingv2.HorizontalPodAutoscaler - expectedError bool - }{ - { - name: "normal case", - object: newDeployment("deployment-1", 0), - gr: schema.GroupResource{Group: appsv1.SchemeGroupVersion.Group, Resource: "deployments"}, - scale: newScale("deployment-1", 0), - hpa: newHPA(appsv1.SchemeGroupVersion.String(), "Deployment", "deployment-1", 3), - expectedError: false, - }, - { - name: "custom resource case", - object: newWorkload("workload-1", 0), - gr: schema.GroupResource{Group: workloadv1alpha1.SchemeGroupVersion.Group, Resource: "workloads"}, - scale: newScale("workload-1", 0), - hpa: newHPA(workloadv1alpha1.SchemeGroupVersion.String(), "Workload", "workload-1", 3), - expectedError: false, - }, - { - name: "scale not found", - object: newDeployment("deployment-1", 0), - gr: schema.GroupResource{Group: "fake", Resource: "fakeworkloads"}, - scale: newScale("fake-workload-1", 0), - hpa: newHPA("fake/v1", "FakeWorkload", "fake-workload-1", 3), - expectedError: true, - }, - } - - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - syncer := newHPAReplicasSyncer(tt.object) - err := syncer.updateScaleIfNeed(context.TODO(), tt.gr, tt.scale, tt.hpa) - if tt.expectedError { - assert.NotEmpty(t, err) - return - } - assert.Empty(t, err) - - obj := &unstructured.Unstructured{} - obj.SetAPIVersion(tt.hpa.Spec.ScaleTargetRef.APIVersion) - obj.SetKind(tt.hpa.Spec.ScaleTargetRef.Kind) - - err = syncer.Client.Get(context.TODO(), types.NamespacedName{Namespace: tt.scale.Namespace, Name: tt.scale.Name}, obj) - assert.Empty(t, err) - if err != nil { - return - } - - scale, err := getScaleFromUnstructured(obj) - assert.Empty(t, err) - if err != nil { - return - } - - assert.Equal(t, tt.hpa.Status.DesiredReplicas, scale.Spec.Replicas) - }) - } -} - -func newHPAReplicasSyncer(objs ...client.Object) *HPAReplicasSyncer { - scheme := gclient.NewSchema() - _ = workloadv1alpha1.AddToScheme(scheme) - - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() - fakeMapper := newMapper() - fakeScaleClient := &scalefake.FakeScaleClient{} - - fakeScaleClient.AddReactor("get", "*", reactionFuncForGetting(fakeClient, fakeMapper)) - fakeScaleClient.AddReactor("update", "*", reactionFuncForUpdating(fakeClient, fakeMapper)) - - return &HPAReplicasSyncer{ - Client: fakeClient, - RESTMapper: fakeMapper, - ScaleClient: fakeScaleClient, - } -} - -func reactionFuncForGetting(c client.Client, mapper meta.RESTMapper) coretesting.ReactionFunc { - return func(action coretesting.Action) (bool, runtime.Object, error) { - getAction, ok := action.(coretesting.GetAction) - if !ok { - return false, nil, fmt.Errorf("not GET Action") - } - - obj, err := newUnstructured(getAction.GetResource(), mapper) - if err != nil { - return true, nil, err - } - - nn := types.NamespacedName{Namespace: getAction.GetNamespace(), Name: getAction.GetName()} - err = c.Get(context.TODO(), nn, obj) - if err != nil { - return true, nil, err - } - - scale, err := getScaleFromUnstructured(obj) - - return true, scale, err - } -} - -func newUnstructured(gvr schema.GroupVersionResource, mapper meta.RESTMapper) (*unstructured.Unstructured, error) { - gvk, err := mapper.KindFor(gvr) - if err != nil { - return nil, err - } - - un := &unstructured.Unstructured{} - un.SetGroupVersionKind(gvk) - - return un, nil -} - -func getScaleFromUnstructured(obj *unstructured.Unstructured) (*autoscalingv1.Scale, error) { - replicas := int32(0) - spec, ok := obj.Object["spec"].(map[string]interface{}) - if ok { - replicas = int32(spec["replicas"].(int64)) - } - - return &autoscalingv1.Scale{ - Spec: autoscalingv1.ScaleSpec{ - Replicas: replicas, - }, - Status: autoscalingv1.ScaleStatus{ - Replicas: replicas, - }, - }, nil -} - -func reactionFuncForUpdating(c client.Client, mapper meta.RESTMapper) coretesting.ReactionFunc { - return func(action coretesting.Action) (bool, runtime.Object, error) { - updateAction, ok := action.(coretesting.UpdateAction) - if !ok { - return false, nil, fmt.Errorf("not UPDATE Action") - } - - scale, ok := updateAction.GetObject().(*autoscalingv1.Scale) - if !ok { - return false, nil, fmt.Errorf("not autoscalingv1.Scale Object") - } - - obj, err := newUnstructured(updateAction.GetResource(), mapper) - if err != nil { - return true, nil, err - } - - nn := types.NamespacedName{Namespace: scale.Namespace, Name: scale.Name} - err = c.Get(context.TODO(), nn, obj) - if err != nil { - return true, nil, err - } - - updateScaleForUnstructured(obj, scale) - - return true, scale, c.Update(context.TODO(), obj) - } -} - -func updateScaleForUnstructured(obj *unstructured.Unstructured, scale *autoscalingv1.Scale) { - spec, ok := obj.Object["spec"].(map[string]interface{}) - if !ok { - spec = map[string]interface{}{} - obj.Object["spec"] = spec - } - - spec["replicas"] = scale.Spec.Replicas -} - -func newMapper() meta.RESTMapper { - m := meta.NewDefaultRESTMapper([]schema.GroupVersion{}) - m.Add(appsv1.SchemeGroupVersion.WithKind("Deployment"), meta.RESTScopeNamespace) - m.Add(workloadv1alpha1.SchemeGroupVersion.WithKind("Workload"), meta.RESTScopeNamespace) - return m -} - -func newDeployment(name string, replicas int32) *appsv1.Deployment { - return &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - }, - Spec: appsv1.DeploymentSpec{ - Replicas: &replicas, - }, - } -} - -func newWorkload(name string, replicas int32) *workloadv1alpha1.Workload { - return &workloadv1alpha1.Workload{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - }, - Spec: workloadv1alpha1.WorkloadSpec{ - Replicas: &replicas, - }, - } -} - -func newHPA(apiVersion, kind, name string, replicas int32) *autoscalingv2.HorizontalPodAutoscaler { - return &autoscalingv2.HorizontalPodAutoscaler{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - }, - Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ - ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ - APIVersion: apiVersion, - Kind: kind, - Name: name, - }, - }, - Status: autoscalingv2.HorizontalPodAutoscalerStatus{ - DesiredReplicas: replicas, - }, - } -} - -func newScale(name string, replicas int32) *autoscalingv1.Scale { - return &autoscalingv1.Scale{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - }, - Spec: autoscalingv1.ScaleSpec{ - Replicas: replicas, - }, - } -} diff --git a/pkg/deploymentreplicassyncer/deployment_replicas_syncer_controller.go b/pkg/deploymentreplicassyncer/deployment_replicas_syncer_controller.go new file mode 100644 index 000000000000..e3943e8d9330 --- /dev/null +++ b/pkg/deploymentreplicassyncer/deployment_replicas_syncer_controller.go @@ -0,0 +1,162 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deploymentreplicassyncer + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/names" +) + +const ( + // ControllerName is the controller name that will be used when reporting events. + ControllerName = "deployment-replicas-syncer" +) + +// DeploymentReplicasSyncer is to sync deployment replicas from status field to spec field. +type DeploymentReplicasSyncer struct { + Client client.Client +} + +var predicateFunc = predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return false }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj := e.ObjectOld.(*appsv1.Deployment) + newObj := e.ObjectNew.(*appsv1.Deployment) + + // if deployment is not labeled `retain-replicas`, means it is not controlled by hpa, ignore the event + retainReplicasLabel := util.GetLabelValue(newObj.GetLabels(), util.RetainReplicasLabel) + if retainReplicasLabel != util.RetainReplicasValue { + return false + } + + if oldObj.Spec.Replicas == nil || newObj.Spec.Replicas == nil { + klog.Errorf("spec.replicas field unexpectedly become nil: %+v, %+v", oldObj.Spec.Replicas, newObj.Spec.Replicas) + return false + } + + // if replicas field has no change, either in spec.replicas or in status.replicas + if *oldObj.Spec.Replicas == *newObj.Spec.Replicas && oldObj.Status.Replicas == newObj.Status.Replicas { + return false + } + + // if replicas in spec already the same as in status + if *newObj.Spec.Replicas == newObj.Status.Replicas { + return false + } + + return true + }, + DeleteFunc: func(event.DeleteEvent) bool { return false }, + GenericFunc: func(event.GenericEvent) bool { return false }, +} + +// SetupWithManager creates a controller and register to controller manager. +func (r *DeploymentReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error { + return controllerruntime.NewControllerManagedBy(mgr). + Named(ControllerName). + For(&appsv1.Deployment{}, builder.WithPredicates(predicateFunc)). + Complete(r) +} + +func (r *DeploymentReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling for Deployment %s/%s", req.Namespace, req.Name) + + deployment := &appsv1.Deployment{} + binding := &v1alpha2.ResourceBinding{} + bindingName := names.GenerateBindingName(util.DeploymentKind, req.Name) + + if err := r.Client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: bindingName}, binding); err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("no need to update deployment replicas for deployment not found") + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{}, err + } + + // if it is not divided schedule type, no need to update replicas + if binding.Spec.Placement.ReplicaSchedulingType() != policyv1alpha1.ReplicaSchedulingTypeDivided { + return controllerruntime.Result{}, nil + } + + if err := r.Client.Get(ctx, req.NamespacedName, deployment); err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("no need to update deployment replicas for binding not found") + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{}, err + } + + // if replicas in spec already the same as in status, no need to update replicas + if deployment.Spec.Replicas != nil && *deployment.Spec.Replicas == deployment.Status.Replicas { + klog.Infof("replicas in spec field (%d) already equal to in status field (%d)", *deployment.Spec.Replicas, deployment.Status.Replicas) + return controllerruntime.Result{}, nil + } + + // make sure the replicas change in deployment.spec can sync to binding.spec, otherwise retry + if deployment.Spec.Replicas == nil || *deployment.Spec.Replicas != binding.Spec.Replicas { + klog.Errorf("wait until replicas of binding (%d) equal to replicas of deployment (%d)", + binding.Spec.Replicas, *deployment.Spec.Replicas) + return controllerruntime.Result{}, fmt.Errorf("retry to wait replicas change sync to binding") + } + + // make sure the scheduler observed generation equal to generation in binding, otherwise retry + if binding.Generation != binding.Status.SchedulerObservedGeneration { + klog.Errorf("wait until scheduler observed generation (%d) equal to generation in binding (%d)", + binding.Status.SchedulerObservedGeneration, binding.Generation) + return controllerruntime.Result{}, fmt.Errorf("retry to wait scheduler observed generation") + } + + if len(binding.Status.AggregatedStatus) != len(binding.Spec.Clusters) { + klog.Errorf("wait until all clusters status collected, got: %d, expected: %d", + len(binding.Status.AggregatedStatus), len(binding.Spec.Clusters)) + return controllerruntime.Result{}, fmt.Errorf("retry to wait status in binding collected") + } + for _, status := range binding.Status.AggregatedStatus { + if status.Status == nil { + klog.Errorf("wait until aggregated status of cluster %s collected", status.ClusterName) + return controllerruntime.Result{}, fmt.Errorf("retry to wait status in binding collected") + } + } + klog.Infof("all %d clusters status collected success", len(binding.Status.AggregatedStatus)) + + // update replicas + oldReplicas := *deployment.Spec.Replicas + deployment.Spec.Replicas = &deployment.Status.Replicas + if err := r.Client.Update(ctx, deployment); err != nil { + klog.Errorf("failed to update deployment (%s/%s) replicas: %+v", deployment.Namespace, deployment.Name, err) + return controllerruntime.Result{}, err + } + + klog.Infof("succeffully udpate deployment (%s/%s) replicas from %d to %d", deployment.Namespace, + deployment.Namespace, oldReplicas, deployment.Status.Replicas) + + return controllerruntime.Result{}, nil +}