From 546b2020c5b5174e6e095d778560b6d9ef7599a0 Mon Sep 17 00:00:00 2001 From: lxtywypc Date: Mon, 18 Sep 2023 10:00:43 +0800 Subject: [PATCH] add replicas syncer for resources with HPA Signed-off-by: lxtywypc --- .../app/controllermanager.go | 35 ++++++ pkg/replicassyncer/controller.go | 102 ++++++++++++++++++ pkg/replicassyncer/controller_test.go | 3 + pkg/replicassyncer/utils.go | 72 +++++++++++++ pkg/replicassyncer/utils_test.go | 3 + pkg/replicassyncer/webhook.go | 71 ++++++++++++ pkg/replicassyncer/webhook_test.go | 3 + 7 files changed, 289 insertions(+) create mode 100644 pkg/replicassyncer/controller.go create mode 100644 pkg/replicassyncer/controller_test.go create mode 100644 pkg/replicassyncer/utils.go create mode 100644 pkg/replicassyncer/utils_test.go create mode 100644 pkg/replicassyncer/webhook.go create mode 100644 pkg/replicassyncer/webhook_test.go diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index f8cd45e16a0c..726536844bdf 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/scale" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/term" "k8s.io/klog/v2" @@ -23,11 +24,13 @@ import ( "k8s.io/metrics/pkg/client/external_metrics" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/karmada-io/karmada/cmd/controller-manager/app/options" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -53,6 +56,7 @@ import ( "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient" "github.com/karmada-io/karmada/pkg/metrics" + "github.com/karmada-io/karmada/pkg/replicassyncer" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli" "github.com/karmada-io/karmada/pkg/sharedcli/klogflag" @@ -205,6 +209,7 @@ func init() { controllers["applicationFailover"] = startApplicationFailoverController controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController + controllers["replicasSyncer"] = startReplicasSyncerController } func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) { @@ -591,6 +596,36 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext. return true, nil } +func startReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) { + hpaClient := kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig()) + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) + httpClient, err := rest.HTTPClientFor(ctx.Mgr.GetConfig()) + if err != nil { + return false, err + } + + mapper, err := apiutil.NewDiscoveryRESTMapper(ctx.Mgr.GetConfig(), httpClient) + if err != nil { + return false, err + } + + scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + return false, err + } + + replicasSyncer := replicassyncer.NewReplicasSyncer(ctx.Mgr.GetClient(), mapper, scaleClient) + err = replicasSyncer.SetupWithManager(ctx.Mgr) + if err != nil { + return false, err + } + + ctx.Mgr.GetWebhookServer().Register("/hpa-update-check", + &webhook.Admission{Handler: replicassyncer.NewHPAUpdateWebhook(ctx.Mgr.GetClient(), mapper, scaleClient, replicasSyncer.GenericChan)}) + + return true, nil +} + // setupControllers initialize controllers and setup one by one. func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { restConfig := mgr.GetConfig() diff --git a/pkg/replicassyncer/controller.go b/pkg/replicassyncer/controller.go new file mode 100644 index 000000000000..7b3eb8c970aa --- /dev/null +++ b/pkg/replicassyncer/controller.go @@ -0,0 +1,102 @@ +package replicassyncer + +import ( + "context" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/scale" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type ReplicasSyncer struct { + hostClient client.Client + mapper meta.RESTMapper + scaleClient scale.ScalesGetter + + GenericChan chan<- event.GenericEvent +} + +func NewReplicasSyncer(hostClient client.Client, mapper meta.RESTMapper, scaleClient scale.ScalesGetter) *ReplicasSyncer { + return &ReplicasSyncer{ + hostClient: hostClient, + mapper: mapper, + scaleClient: scaleClient, + } +} + +func (r *ReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error { + ch := make(chan event.GenericEvent) + r.GenericChan = ch + return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer"). + For(&autoscalingv2.HorizontalPodAutoscaler{}, + builder.WithPredicates(HPAPredicate)). + WatchesRawSource(&source.Channel{Source: ch}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(HPAPredicate)). + Complete(r) +} + +func (r *ReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling for HPA %s/%s", req.Namespace, req.Name) + + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + err := r.hostClient.Get(ctx, req.NamespacedName, hpa) + if err != nil { + if errors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + + return controllerruntime.Result{}, err + } + + workloadGR, scale, err := GetGroupResourceAndScaleForWorkloadFromHPA(ctx, r.mapper, r.scaleClient, hpa) + if err != nil { + return controllerruntime.Result{}, err + } + + // If the scale of workload is not found, skip processing. + if scale == nil { + klog.Infof("Scale of resource(kind=%s, %s/%s) not found, the resource might have been removed, skip", + hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name) + + return controllerruntime.Result{}, nil + } + + err = r.updateScaleIfNeed(ctx, workloadGR, scale, hpa) + if err != nil { + return controllerruntime.Result{}, err + } + + return controllerruntime.Result{}, nil +} + +// updateScaleIfNeed would update the scale of workload on fed-control plane +// if the replicas decared in the workload on fed-control-plane dose not match +// the actual replicas in member clusters effected by HPA. +func (r *ReplicasSyncer) updateScaleIfNeed(ctx context.Context, workloadGR schema.GroupResource, scale *autoscalingv1.Scale, hpa *autoscalingv2.HorizontalPodAutoscaler) error { + if scale.Spec.Replicas != hpa.Status.CurrentReplicas { + oldReplicas := scale.Spec.Replicas + + scale.Spec.Replicas = hpa.Status.CurrentReplicas + _, err := r.scaleClient.Scales(hpa.Namespace).Update(ctx, workloadGR, scale, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Failed to try to sync scale for resource(kind=%s, %s/%s) from %d to %d: %v", + hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas, err) + return err + } + + klog.Infof("Successfully synced scale for resource(kind=%s, %s/%s) from %d to %d", + hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas) + } + + return nil +} diff --git a/pkg/replicassyncer/controller_test.go b/pkg/replicassyncer/controller_test.go new file mode 100644 index 000000000000..11394aafbb81 --- /dev/null +++ b/pkg/replicassyncer/controller_test.go @@ -0,0 +1,3 @@ +package replicassyncer + +// TODO(@lxtywypc): Add unit test. diff --git a/pkg/replicassyncer/utils.go b/pkg/replicassyncer/utils.go new file mode 100644 index 000000000000..7b152bce6a6c --- /dev/null +++ b/pkg/replicassyncer/utils.go @@ -0,0 +1,72 @@ +package replicassyncer + +import ( + "context" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/scale" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +const KindHPA = "HorizontalPodAutoscaler" + +var HPAPredicate = predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler) + if !ok { + return false + } + + newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler) + if !ok { + return false + } + + return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + GenericFunc: func(e event.GenericEvent) bool { + return e.Object.GetObjectKind().GroupVersionKind().Kind == KindHPA + }, +} + +func GetGroupResourceAndScaleForWorkloadFromHPA(ctx context.Context, mapper meta.RESTMapper, scaleClient scale.ScalesGetter, hpa *autoscalingv2.HorizontalPodAutoscaler, +) (schema.GroupResource, *autoscalingv1.Scale, error) { + gvk := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + klog.Errorf("Failed to get group resource for resource(kind=%s, %s/%s): %v", + hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err) + + return schema.GroupResource{}, nil, err + } + + gr := mapping.Resource.GroupResource() + + scale, err := scaleClient.Scales(hpa.Namespace).Get(ctx, gr, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + // If the scale of workload is not found, skip processing. + return gr, nil, nil + } + + klog.Errorf("Failed to get scale for resource(kind=%s, %s/%s): %v", + hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err) + + return schema.GroupResource{}, nil, err + } + + return gr, scale, nil +} diff --git a/pkg/replicassyncer/utils_test.go b/pkg/replicassyncer/utils_test.go new file mode 100644 index 000000000000..11394aafbb81 --- /dev/null +++ b/pkg/replicassyncer/utils_test.go @@ -0,0 +1,3 @@ +package replicassyncer + +// TODO(@lxtywypc): Add unit test. diff --git a/pkg/replicassyncer/webhook.go b/pkg/replicassyncer/webhook.go new file mode 100644 index 000000000000..38174d423703 --- /dev/null +++ b/pkg/replicassyncer/webhook.go @@ -0,0 +1,71 @@ +package replicassyncer + +import ( + "context" + "net/http" + + admissionv1 "k8s.io/api/admission/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/scale" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +type HPAUpdaterWebhook struct { + hostClient client.Client + decoder *admission.Decoder + mapper meta.RESTMapper + scaleClient scale.ScalesGetter + genericChan chan<- event.GenericEvent +} + +func NewHPAUpdateWebhook(hostClient client.Client, mapper meta.RESTMapper, scaleClient scale.ScalesGetter, genericChan chan<- event.GenericEvent) *HPAUpdaterWebhook { + return &HPAUpdaterWebhook{ + hostClient: hostClient, + mapper: mapper, + scaleClient: scaleClient, + genericChan: genericChan, + } +} + +func (c *HPAUpdaterWebhook) Handle(ctx context.Context, req admission.Request) admission.Response { + if req.Kind.Kind != KindHPA { + klog.Warningf("Unsupported kind object %s/%s/%s, skip.", req.Kind.Kind, req.Namespace, req.Name) + return admission.Allowed("") + } + + oldHPA := &autoscalingv2.HorizontalPodAutoscaler{} + err := c.decoder.DecodeRaw(req.OldObject, oldHPA) + if err != nil { + klog.Errorf("Failed to decode object %s/%s: %v", req.Namespace, req.Name, err) + return admission.Errored(http.StatusInternalServerError, err) + } + + switch req.Operation { + case admissionv1.Delete: + // When deleting, we should block it to make sure the workload replicas has been synced. + _, scale, err := GetGroupResourceAndScaleForWorkloadFromHPA(ctx, c.mapper, c.scaleClient, oldHPA) + if err != nil { + klog.Errorf("Denied %s combinator %s/%s: %v.", req.Operation, req.Namespace, req.Name, err) + return admission.Errored(http.StatusInternalServerError, err) + } + + if scale.Spec.Replicas != oldHPA.Status.CurrentReplicas { + go func() { + c.genericChan <- event.GenericEvent{Object: oldHPA} + }() + + return admission.Denied("clusters replicas haven't been synced yet") + } + } + + return admission.Allowed("") +} + +func (c *HPAUpdaterWebhook) InjectDecoder(d *admission.Decoder) error { + c.decoder = d + return nil +} diff --git a/pkg/replicassyncer/webhook_test.go b/pkg/replicassyncer/webhook_test.go new file mode 100644 index 000000000000..11394aafbb81 --- /dev/null +++ b/pkg/replicassyncer/webhook_test.go @@ -0,0 +1,3 @@ +package replicassyncer + +// TODO(@lxtywypc): Add unit test.