Skip to content

Commit

Permalink
add replicas syncer for resources with HPA
Browse files Browse the repository at this point in the history
Signed-off-by: lxtywypc <[email protected]>
  • Loading branch information
lxtywypc committed Sep 18, 2023
1 parent 5c77f45 commit 546b202
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 0 deletions.
35 changes: 35 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand All @@ -23,11 +24,13 @@ import (
"k8s.io/metrics/pkg/client/external_metrics"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand All @@ -53,6 +56,7 @@ import (
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/replicassyncer"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli"
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
Expand Down Expand Up @@ -205,6 +209,7 @@ func init() {
controllers["applicationFailover"] = startApplicationFailoverController
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
controllers["replicasSyncer"] = startReplicasSyncerController
}

func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
Expand Down Expand Up @@ -591,6 +596,36 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
return true, nil
}

func startReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
hpaClient := kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig())
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
httpClient, err := rest.HTTPClientFor(ctx.Mgr.GetConfig())
if err != nil {
return false, err
}

mapper, err := apiutil.NewDiscoveryRESTMapper(ctx.Mgr.GetConfig(), httpClient)
if err != nil {
return false, err
}

scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return false, err
}

replicasSyncer := replicassyncer.NewReplicasSyncer(ctx.Mgr.GetClient(), mapper, scaleClient)
err = replicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
}

ctx.Mgr.GetWebhookServer().Register("/hpa-update-check",
&webhook.Admission{Handler: replicassyncer.NewHPAUpdateWebhook(ctx.Mgr.GetClient(), mapper, scaleClient, replicasSyncer.GenericChan)})

return true, nil
}

// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
Expand Down
102 changes: 102 additions & 0 deletions pkg/replicassyncer/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package replicassyncer

import (
"context"

autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type ReplicasSyncer struct {
hostClient client.Client
mapper meta.RESTMapper
scaleClient scale.ScalesGetter

GenericChan chan<- event.GenericEvent
}

func NewReplicasSyncer(hostClient client.Client, mapper meta.RESTMapper, scaleClient scale.ScalesGetter) *ReplicasSyncer {
return &ReplicasSyncer{
hostClient: hostClient,
mapper: mapper,
scaleClient: scaleClient,
}
}

func (r *ReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
ch := make(chan event.GenericEvent)
r.GenericChan = ch
return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer").
For(&autoscalingv2.HorizontalPodAutoscaler{},
builder.WithPredicates(HPAPredicate)).
WatchesRawSource(&source.Channel{Source: ch}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(HPAPredicate)).
Complete(r)
}

func (r *ReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for HPA %s/%s", req.Namespace, req.Name)

hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.hostClient.Get(ctx, req.NamespacedName, hpa)
if err != nil {
if errors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}

return controllerruntime.Result{}, err
}

workloadGR, scale, err := GetGroupResourceAndScaleForWorkloadFromHPA(ctx, r.mapper, r.scaleClient, hpa)
if err != nil {
return controllerruntime.Result{}, err
}

// If the scale of workload is not found, skip processing.
if scale == nil {
klog.Infof("Scale of resource(kind=%s, %s/%s) not found, the resource might have been removed, skip",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

return controllerruntime.Result{}, nil
}

err = r.updateScaleIfNeed(ctx, workloadGR, scale, hpa)
if err != nil {
return controllerruntime.Result{}, err
}

return controllerruntime.Result{}, nil
}

// updateScaleIfNeed would update the scale of workload on fed-control plane
// if the replicas decared in the workload on fed-control-plane dose not match
// the actual replicas in member clusters effected by HPA.
func (r *ReplicasSyncer) updateScaleIfNeed(ctx context.Context, workloadGR schema.GroupResource, scale *autoscalingv1.Scale, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
if scale.Spec.Replicas != hpa.Status.CurrentReplicas {
oldReplicas := scale.Spec.Replicas

scale.Spec.Replicas = hpa.Status.CurrentReplicas
_, err := r.scaleClient.Scales(hpa.Namespace).Update(ctx, workloadGR, scale, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to try to sync scale for resource(kind=%s, %s/%s) from %d to %d: %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas, err)
return err
}

klog.Infof("Successfully synced scale for resource(kind=%s, %s/%s) from %d to %d",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas)
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/replicassyncer/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package replicassyncer

// TODO(@lxtywypc): Add unit test.
72 changes: 72 additions & 0 deletions pkg/replicassyncer/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package replicassyncer

import (
"context"

autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const KindHPA = "HorizontalPodAutoscaler"

var HPAPredicate = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return e.Object.GetObjectKind().GroupVersionKind().Kind == KindHPA
},
}

func GetGroupResourceAndScaleForWorkloadFromHPA(ctx context.Context, mapper meta.RESTMapper, scaleClient scale.ScalesGetter, hpa *autoscalingv2.HorizontalPodAutoscaler,
) (schema.GroupResource, *autoscalingv1.Scale, error) {
gvk := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
klog.Errorf("Failed to get group resource for resource(kind=%s, %s/%s): %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err)

return schema.GroupResource{}, nil, err
}

gr := mapping.Resource.GroupResource()

scale, err := scaleClient.Scales(hpa.Namespace).Get(ctx, gr, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// If the scale of workload is not found, skip processing.
return gr, nil, nil
}

klog.Errorf("Failed to get scale for resource(kind=%s, %s/%s): %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err)

return schema.GroupResource{}, nil, err
}

return gr, scale, nil
}
3 changes: 3 additions & 0 deletions pkg/replicassyncer/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package replicassyncer

// TODO(@lxtywypc): Add unit test.
71 changes: 71 additions & 0 deletions pkg/replicassyncer/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package replicassyncer

import (
"context"
"net/http"

admissionv1 "k8s.io/api/admission/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

type HPAUpdaterWebhook struct {
hostClient client.Client
decoder *admission.Decoder
mapper meta.RESTMapper
scaleClient scale.ScalesGetter
genericChan chan<- event.GenericEvent
}

func NewHPAUpdateWebhook(hostClient client.Client, mapper meta.RESTMapper, scaleClient scale.ScalesGetter, genericChan chan<- event.GenericEvent) *HPAUpdaterWebhook {
return &HPAUpdaterWebhook{
hostClient: hostClient,
mapper: mapper,
scaleClient: scaleClient,
genericChan: genericChan,
}
}

func (c *HPAUpdaterWebhook) Handle(ctx context.Context, req admission.Request) admission.Response {
if req.Kind.Kind != KindHPA {
klog.Warningf("Unsupported kind object %s/%s/%s, skip.", req.Kind.Kind, req.Namespace, req.Name)
return admission.Allowed("")
}

oldHPA := &autoscalingv2.HorizontalPodAutoscaler{}
err := c.decoder.DecodeRaw(req.OldObject, oldHPA)
if err != nil {
klog.Errorf("Failed to decode object %s/%s: %v", req.Namespace, req.Name, err)
return admission.Errored(http.StatusInternalServerError, err)
}

switch req.Operation {
case admissionv1.Delete:
// When deleting, we should block it to make sure the workload replicas has been synced.
_, scale, err := GetGroupResourceAndScaleForWorkloadFromHPA(ctx, c.mapper, c.scaleClient, oldHPA)
if err != nil {
klog.Errorf("Denied %s combinator %s/%s: %v.", req.Operation, req.Namespace, req.Name, err)
return admission.Errored(http.StatusInternalServerError, err)
}

if scale.Spec.Replicas != oldHPA.Status.CurrentReplicas {
go func() {
c.genericChan <- event.GenericEvent{Object: oldHPA}
}()

return admission.Denied("clusters replicas haven't been synced yet")
}
}

return admission.Allowed("")
}

func (c *HPAUpdaterWebhook) InjectDecoder(d *admission.Decoder) error {
c.decoder = d
return nil
}
3 changes: 3 additions & 0 deletions pkg/replicassyncer/webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package replicassyncer

// TODO(@lxtywypc): Add unit test.

0 comments on commit 546b202

Please sign in to comment.