Skip to content

Commit

Permalink
add replicas syncer for resources with HPA
Browse files Browse the repository at this point in the history
Signed-off-by: lxtywypc <[email protected]>
  • Loading branch information
lxtywypc committed Sep 18, 2023
1 parent 5c77f45 commit ea4500d
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 0 deletions.
11 changes: 11 additions & 0 deletions artifacts/deploy/karmada-controller-manager-cert-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: Secret
metadata:
name: controller-manager-cert
namespace: karmada-system
type: kubernetes.io/tls
data:
tls.crt: |
{{server_certificate}}
tls.key: |
{{server_key}}
9 changes: 9 additions & 0 deletions artifacts/deploy/karmada-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ spec:
- --secure-port=10357
- --failover-eviction-timeout=30s
- --v=4
- --webhook-bind-address=0.0.0.0
- --webhook-secure-port=8443
- --webhook-cert-dir=/var/serving-cert
livenessProbe:
httpGet:
path: /healthz
Expand All @@ -44,7 +47,13 @@ spec:
- name: kubeconfig
subPath: kubeconfig
mountPath: /etc/kubeconfig
- name: cert
mountPath: /var/serving-cert
readOnly: true
volumes:
- name: kubeconfig
secret:
secretName: kubeconfig
- name: cert
secret:
secretName: controller-manager-cert
14 changes: 14 additions & 0 deletions artifacts/deploy/webhook-configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,17 @@ webhooks:
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
- name: autoscaling.k8s.io
rules:
- operations: ["DELETE"]
apiGroups: ["autoscaling"]
apiVersions: ["*"]
resources: ["horizontalpodautoscalers"]
scope: "Namespaced"
clientConfig:
url: https://karmada-controller-manager.karmada-system.svc:8443/validate-horizontalpodautoscaler
caBundle: {{caBundle}}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
14 changes: 14 additions & 0 deletions charts/karmada/templates/_karmada_webhook_configuration.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,18 @@ webhooks:
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
- name: autoscaling.k8s.io
rules:
- operations: ["DELETE"]
apiGroups: ["autoscaling"]
apiVersions: ["*"]
resources: ["horizontalpodautoscalers"]
scope: "Namespaced"
clientConfig:
url: https://{{ $name }}-controller-manager.{{ $namespace }}.svc:8443/validate-horizontalpodautoscaler
{{- include "karmada.webhook.caBundle" . | nindent 6 }}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
{{- end -}}
12 changes: 12 additions & 0 deletions charts/karmada/templates/karmada-cert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ data:
tls.key: |
{{ b64enc .Values.certs.custom.key }}
---
apiVersion: v1
kind: Secret
metadata:
name: {{ include "karmada.name" . }}-controller-manager-cert
namespace: {{ include "karmada.namespace" . }}
type: kubernetes.io/tls
data:
tls.crt: |
{{ b64enc .Values.certs.custom.crt }}
tls.key: |
{{ b64enc .Values.certs.custom.key }}
---
{{- end }}

{{- if and (eq .Values.installMode "host") (eq .Values.etcd.mode "external") }}
Expand Down
9 changes: 9 additions & 0 deletions charts/karmada/templates/karmada-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ spec:
{{- end }}
volumes:
{{- include "karmada.kubeconfig.volume" . | nindent 8 }}
- name: {{ $name }}-controller-manager-cert-secret
secret:
secretName: {{ $name }}-controller-manager-cert
containers:
- name: {{ $name }}-controller-manager
image: {{ template "karmada.controllerManager.image" . }}
Expand All @@ -55,6 +58,9 @@ spec:
- --secure-port=10357
- --leader-elect-resource-namespace={{ $systemNamespace }}
- --v=2
- --webhook-bind-address=0.0.0.0
- --webhook-secure-port=8443
- --webhook-cert-dir=/var/serving-cert
{{- if .Values.controllerManager.controllers }}
- --controllers={{ .Values.controllerManager.controllers }}
{{- end }}
Expand All @@ -73,6 +79,9 @@ spec:
timeoutSeconds: 5
volumeMounts:
{{- include "karmada.kubeconfig.volumeMount" . | nindent 12 }}
- name: {{ $name }}-controller-manager-cert-secret
mountPath: /var/serving-cert
readOnly: true
resources:
{{- toYaml .Values.controllerManager.resources | nindent 12 }}

Expand Down
43 changes: 43 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand All @@ -23,11 +24,13 @@ import (
"k8s.io/metrics/pkg/client/external_metrics"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand All @@ -53,6 +56,7 @@ import (
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/replicassyncer"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli"
"github.com/karmada-io/karmada/pkg/sharedcli/klogflag"
Expand Down Expand Up @@ -155,6 +159,14 @@ func Run(ctx context.Context, opts *options.Options) error {
opts.DefaultTransform = fedinformer.StripUnusedFields
return cache.New(config, opts)
},
WebhookServer: webhook.NewServer(webhook.Options{
Host: opts.WebhookBindAddress,
Port: opts.WebhookSecurePort,
CertDir: opts.WebhookCertDir,
CertName: opts.WebhookCertName,
KeyName: opts.WebhookKeyName,
TLSMinVersion: opts.WebhookTLSMinVersion,
}),
})
if err != nil {
klog.Errorf("Failed to build controller manager: %v", err)
Expand Down Expand Up @@ -205,6 +217,7 @@ func init() {
controllers["applicationFailover"] = startApplicationFailoverController
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
controllers["replicasSyncer"] = startReplicasSyncerController
}

func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
Expand Down Expand Up @@ -591,6 +604,36 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
return true, nil
}

func startReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
hpaClient := kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig())
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
httpClient, err := rest.HTTPClientFor(ctx.Mgr.GetConfig())
if err != nil {
return false, err
}

mapper, err := apiutil.NewDiscoveryRESTMapper(ctx.Mgr.GetConfig(), httpClient)
if err != nil {
return false, err
}

scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return false, err
}

replicasSyncer := replicassyncer.NewReplicasSyncer(ctx.Mgr.GetClient(), mapper, scaleClient)
err = replicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
}

ctx.Mgr.GetWebhookServer().Register("/validate-horizontalpodautoscaler",
&webhook.Admission{Handler: replicassyncer.NewHPAUpdateWebhook(ctx.Mgr.GetClient(), mapper, scaleClient, replicasSyncer.GenericChan)})

return true, nil
}

// setupControllers initialize controllers and setup one by one.
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
Expand Down
33 changes: 33 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
const (
defaultBindAddress = "0.0.0.0"
defaultPort = 10357

defaultWebhookBindAddress = "0.0.0.0"
defaultWebhookPort = 8443
defaultWebhookCertDir = "/tmp/k8s-webhook-server/serving-certs"
defaultWebhookTLSMinVersion = "1.3"
)

var (
Expand Down Expand Up @@ -132,6 +137,25 @@ type Options struct {
// in scenario of dynamic replica assignment based on cluster free resources.
// Disable if it does not fit your cases for better performance.
EnableClusterResourceModeling bool

// WebhookBindAddress is the IP address on which to listen for the --webhook-secure-port port.
// Default is "0.0.0.0".
WebhookBindAddress string
// WebhookSecurePort is the port that the webhook server serves at.
// Default is 8443.
WebhookSecurePort int
// WebhookCertDir is the directory that contains the server key and certificate.
// if not set, webhook server would look up the server key and certificate in {TempDir}/k8s-webhook-server/serving-certs.
WebhookCertDir string
// WebhookCertName is the server certificate name. Defaults to tls.crt.
WebhookCertName string
// WebhookKeyName is the server key name. Defaults to tls.key.
WebhookKeyName string
// WebhookTLSMinVersion is the minimum version of TLS supported. Possible values: 1.0, 1.1, 1.2, 1.3.
// Some environments have automated security scans that trigger on TLS versions or insecure cipher suites, and
// setting TLS to 1.3 would solve both problems.
// Defaults to 1.3.
WebhookTLSMinVersion string
}

// NewOptions builds an empty options.
Expand Down Expand Up @@ -216,6 +240,15 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
flags.BoolVar(&o.EnableClusterResourceModeling, "enable-cluster-resource-modeling", true, "Enable means controller would build resource modeling for each cluster by syncing Nodes and Pods resources.\n"+
"The resource modeling might be used by the scheduler to make scheduling decisions in scenario of dynamic replica assignment based on cluster free resources.\n"+
"Disable if it does not fit your cases for better performance.")
flags.StringVar(&o.WebhookBindAddress, "webhook-bind-address", defaultWebhookBindAddress,
"The IP address on which to listen for the --webhook-secure-port port.")
flags.IntVar(&o.WebhookSecurePort, "webhook-secure-port", defaultWebhookPort,
"The secure port on which the webhook to serve HTTPS.")
flags.StringVar(&o.WebhookCertDir, "webhook-cert-dir", defaultWebhookCertDir,
"The directory that contains the server key and certificate for webhook.")
flags.StringVar(&o.WebhookCertName, "webhook-tls-cert-file-name", "tls.crt", "The name of server certificate for webhook.")
flags.StringVar(&o.WebhookKeyName, "webhook-tls-private-key-file-name", "tls.key", "The name of server key for webhook.")
flags.StringVar(&o.WebhookTLSMinVersion, "webhook-tls-min-version", defaultWebhookTLSMinVersion, "Minimum TLS version supported for webhook. Possible values: 1.0, 1.1, 1.2, 1.3.")

o.RateLimiterOpts.AddFlags(flags)
o.ProfileOpts.AddFlags(flags)
Expand Down
4 changes: 4 additions & 0 deletions hack/deploy-karmada.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ function generate_cert_secret {
cp -rf "${REPO_ROOT}"/artifacts/deploy/karmada-cert-secret.yaml "${TEMP_PATH}"/karmada-cert-secret-tmp.yaml
cp -rf "${REPO_ROOT}"/artifacts/deploy/secret.yaml "${TEMP_PATH}"/secret-tmp.yaml
cp -rf "${REPO_ROOT}"/artifacts/deploy/karmada-webhook-cert-secret.yaml "${TEMP_PATH}"/karmada-webhook-cert-secret-tmp.yaml
cp -rf "${REPO_ROOT}"/artifacts/deploy/karmada-controller-manager-cert-secret.yaml "${TEMP_PATH}"/karmada-controller-manager-cert-secret-tmp.yaml

sed -i'' -e "s/{{ca_crt}}/${karmada_ca}/g" "${TEMP_PATH}"/karmada-cert-secret-tmp.yaml
sed -i'' -e "s/{{ca_key}}/${karmada_ca_key}/g" "${TEMP_PATH}"/karmada-cert-secret-tmp.yaml
Expand All @@ -108,10 +109,13 @@ function generate_cert_secret {

sed -i'' -e "s/{{server_key}}/${KARMADA_KEY}/g" "${TEMP_PATH}"/karmada-webhook-cert-secret-tmp.yaml
sed -i'' -e "s/{{server_certificate}}/${KARMADA_CRT}/g" "${TEMP_PATH}"/karmada-webhook-cert-secret-tmp.yaml
sed -i'' -e "s/{{server_key}}/${KARMADA_KEY}/g" "${TEMP_PATH}"/karmada-controller-manager-cert-secret-tmp.yaml
sed -i'' -e "s/{{server_certificate}}/${KARMADA_CRT}/g" "${TEMP_PATH}"/karmada-controller-manager-cert-secret-tmp.yaml

kubectl --context="${HOST_CLUSTER_NAME}" apply -f "${TEMP_PATH}"/karmada-cert-secret-tmp.yaml
kubectl --context="${HOST_CLUSTER_NAME}" apply -f "${TEMP_PATH}"/secret-tmp.yaml
kubectl --context="${HOST_CLUSTER_NAME}" apply -f "${TEMP_PATH}"/karmada-webhook-cert-secret-tmp.yaml
kubectl --context="${HOST_CLUSTER_NAME}" apply -f "${TEMP_PATH}"/karmada-controller-manager-cert-secret-tmp.yaml
rm -rf "${TEMP_PATH}"
}

Expand Down
102 changes: 102 additions & 0 deletions pkg/replicassyncer/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package replicassyncer

import (
"context"

autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type ReplicasSyncer struct {
hostClient client.Client
mapper meta.RESTMapper
scaleClient scale.ScalesGetter

GenericChan chan<- event.GenericEvent
}

func NewReplicasSyncer(hostClient client.Client, mapper meta.RESTMapper, scaleClient scale.ScalesGetter) *ReplicasSyncer {
return &ReplicasSyncer{
hostClient: hostClient,
mapper: mapper,
scaleClient: scaleClient,
}
}

func (r *ReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
ch := make(chan event.GenericEvent)
r.GenericChan = ch
return controllerruntime.NewControllerManagedBy(mgr).Named("replicas-syncer").
For(&autoscalingv2.HorizontalPodAutoscaler{},
builder.WithPredicates(HPAPredicate)).
WatchesRawSource(&source.Channel{Source: ch}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(HPAPredicate)).
Complete(r)
}

func (r *ReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for HPA %s/%s", req.Namespace, req.Name)

hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.hostClient.Get(ctx, req.NamespacedName, hpa)
if err != nil {
if errors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}

return controllerruntime.Result{}, err
}

workloadGR, scale, err := GetGroupResourceAndScaleForWorkloadFromHPA(ctx, r.mapper, r.scaleClient, hpa)
if err != nil {
return controllerruntime.Result{}, err
}

// If the scale of workload is not found, skip processing.
if scale == nil {
klog.Infof("Scale of resource(kind=%s, %s/%s) not found, the resource might have been removed, skip",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

return controllerruntime.Result{}, nil
}

err = r.updateScaleIfNeed(ctx, workloadGR, scale, hpa)
if err != nil {
return controllerruntime.Result{}, err
}

return controllerruntime.Result{}, nil
}

// updateScaleIfNeed would update the scale of workload on fed-control plane
// if the replicas decared in the workload on fed-control-plane dose not match
// the actual replicas in member clusters effected by HPA.
func (r *ReplicasSyncer) updateScaleIfNeed(ctx context.Context, workloadGR schema.GroupResource, scale *autoscalingv1.Scale, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
if scale.Spec.Replicas != hpa.Status.CurrentReplicas {
oldReplicas := scale.Spec.Replicas

scale.Spec.Replicas = hpa.Status.CurrentReplicas
_, err := r.scaleClient.Scales(hpa.Namespace).Update(ctx, workloadGR, scale, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to try to sync scale for resource(kind=%s, %s/%s) from %d to %d: %v",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas, err)
return err
}

klog.Infof("Successfully synced scale for resource(kind=%s, %s/%s) from %d to %d",
hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, oldReplicas, hpa.Status.CurrentReplicas)
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/replicassyncer/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package replicassyncer

// TODO(@lxtywypc): Add unit test.
Loading

0 comments on commit ea4500d

Please sign in to comment.