Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add replicas syncer for resources with HPA #4072

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -44,6 +45,7 @@ import (
metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics"
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
"github.com/karmada-io/karmada/pkg/controllers/gracefuleviction"
"github.com/karmada-io/karmada/pkg/controllers/hpareplicassyncer"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/namespace"
"github.com/karmada-io/karmada/pkg/controllers/status"
Expand Down Expand Up @@ -185,7 +187,7 @@ func Run(ctx context.Context, opts *options.Options) error {
var controllers = make(controllerscontext.Initializers)

// controllersDisabledByDefault is the set of controllers which is disabled by default
var controllersDisabledByDefault = sets.New("")
var controllersDisabledByDefault = sets.New("hpaReplicasSyncer")

func init() {
controllers["cluster"] = startClusterController
Expand All @@ -205,6 +207,7 @@ func init() {
controllers["applicationFailover"] = startApplicationFailoverController
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
controllers["hpaReplicasSyncer"] = startHPAReplicasSyncerController
}

func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
Expand Down Expand Up @@ -591,6 +594,26 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
return true, nil
}

func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(ctx.KubeClientSet.Discovery())
scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return false, err
}

hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{
Client: ctx.Mgr.GetClient(),
RESTMapper: ctx.Mgr.GetRESTMapper(),
ScaleClient: scaleClient,
}
err = hpaReplicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
}

return true, nil
}

// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
Expand Down
148 changes: 148 additions & 0 deletions pkg/controllers/hpareplicassyncer/hpa_replicas_syncer_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package hpareplicassyncer

import (
"context"

autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var hpaPredicate = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldHPA, ok := e.ObjectOld.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

newHPA, ok := e.ObjectNew.(*autoscalingv2.HorizontalPodAutoscaler)
if !ok {
return false
}

return oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why you use CurrentReplicas insterd of DesiredReplicas

},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
}

// HPAReplicasSyncer is to sync replicas from status of HPA to resource template.
type HPAReplicasSyncer struct {
Client client.Client
RESTMapper meta.RESTMapper
ScaleClient scale.ScalesGetter
}

// SetupWithManager creates a controller and register to controller manager.
func (r *HPAReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer").
For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(hpaPredicate)).
Complete(r)
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *HPAReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for HPA %s/%s", req.Namespace, req.Name)

hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.Client.Get(ctx, req.NamespacedName, hpa)
if err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}

return controllerruntime.Result{}, err
}

workloadGR, scale, err := r.getGroupResourceAndScaleForWorkloadFromHPA(ctx, hpa)
if err != nil {
return controllerruntime.Result{}, err
}

err = r.updateScaleIfNeed(ctx, workloadGR, scale.DeepCopy(), hpa)
if err != nil {
return controllerruntime.Result{}, err
}

// TODO(@lxtywypc): Add finalizer for HPA and remove them
// when the HPA is deleting and the replicas have been synced.

return controllerruntime.Result{}, nil
}

// getGroupResourceAndScaleForWorkloadFromHPA parses GroupResource and get Scale
// of the workload declared in spec.scaleTargetRef of HPA.
func (r *HPAReplicasSyncer) getGroupResourceAndScaleForWorkloadFromHPA(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler,
) (schema.GroupResource, *autoscalingv1.Scale, error) {
gvk := schema.FromAPIVersionAndKind(hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
mapping, err := r.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
klog.Errorf("Failed to get group resource for resource(kind=%s, %s/%s): %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err)

return schema.GroupResource{}, nil, err
}

gr := mapping.Resource.GroupResource()

scale, err := r.ScaleClient.Scales(hpa.Namespace).Get(ctx, gr, hpa.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// If the scale of workload is not found, skip processing.
return gr, nil, nil
}

klog.Errorf("Failed to get scale for resource(kind=%s, %s/%s): %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, err)

return schema.GroupResource{}, nil, err
}

return gr, scale, nil
}

// updateScaleIfNeed would update the scale of workload on fed-control plane
// if the replicas declared in the workload on karmada-control-plane does not match
// the actual replicas in member clusters effected by HPA.
func (r *HPAReplicasSyncer) updateScaleIfNeed(ctx context.Context, workloadGR schema.GroupResource, scale *autoscalingv1.Scale, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
// If the scale of workload is not found, skip processing.
if scale == nil {
klog.V(4).Infof("Scale of resource(kind=%s, %s/%s) not found, the resource might have been removed, skip",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

return nil
}

if scale.Spec.Replicas != hpa.Status.CurrentReplicas {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why you are using CurrentReplicas instead of DesiredReplicas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the .status.currentReplicas represents the real replicas of a managed workload, but the .status.desiredReplicas is the planned replicas which might not represent the current situation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@XiShanYongYe-Chang XiShanYongYe-Chang Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lxtywypc, can you help update it?

Wait a moment, Let me have a try.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chaunceyjiang
I guess you mean the .spec.replicas represents desired replicas that match the definition of .status.desiredReplicas in HPA, right?

The difference between the two approaches is when to sync replicas. after and before replica change. It makes more sense to sync desired replicas, it looks like the HPA in the Karmada control plane scales the workload :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lxtywypc How do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you mean the .spec.replicas represents desired replicas that match the definition of .status.desiredReplicas in HPA, right?

Yes. I mean exactly this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I prefer to use .status.desiredReplicas because its meaning is consistent with .spec.replicas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chaunceyjiang, it sounds good to me.

Given following task relies on this controller, I suppose we can move this forward, and leave another task for this.
What do you think? @chaunceyjiang @XiShanYongYe-Chang @lxtywypc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

oldReplicas := scale.Spec.Replicas

scale.Spec.Replicas = hpa.Status.CurrentReplicas
_, err := r.ScaleClient.Scales(hpa.Namespace).Update(ctx, workloadGR, scale, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to try to sync scale for resource(kind=%s, %s/%s) from %d to %d: %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas, err)
return err
}

klog.V(4).Infof("Successfully synced scale for resource(kind=%s, %s/%s) from %d to %d",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas)
}

return nil
}
Loading