From 3d8d46d60d4b2dc204d17cd3783f17466926aae0 Mon Sep 17 00:00:00 2001 From: Leo Ryu Date: Tue, 26 Jul 2022 16:42:06 +0800 Subject: [PATCH] feat(platform): controller support crd mode (#2026) --- api/platform/types.go | 2 + api/platform/v1/types.go | 2 + .../app/options/clustercontroller.go | 8 +++ pkg/controller/config/client.go | 14 ++++-- pkg/controller/options/apiclient.go | 7 +-- .../controller/cluster/cluster_controller.go | 49 ++++++++++++++++--- .../controller/cluster/config/types.go | 2 + 7 files changed, 69 insertions(+), 15 deletions(-) diff --git a/api/platform/types.go b/api/platform/types.go index 02414b6c8..f4e6ded0c 100644 --- a/api/platform/types.go +++ b/api/platform/types.go @@ -113,6 +113,8 @@ const ( AnywhereLocalizationsAnno = "tkestack.io/anywhere-localizations" // AnywhereMachinesAnno contains base64 machines json data AnywhereMachinesAnno = "tkestack.io/anywhere-machines" + // ClusterNameLable contains related cluster's name for no-cluster resources + ClusterNameLable = "tkestack.io/cluster-name" ) // KubeVendorType describe the kubernetes provider of the cluster diff --git a/api/platform/v1/types.go b/api/platform/v1/types.go index 8a7e76a21..f13a8339f 100644 --- a/api/platform/v1/types.go +++ b/api/platform/v1/types.go @@ -124,6 +124,8 @@ const ( AnywhereLocalizationsAnno = "tkestack.io/anywhere-localizations" // AnywhereMachinesAnno contains base64 machines json data AnywhereMachinesAnno = "tkestack.io/anywhere-machines" + // ClusterNameLable contains related cluster's name for no-cluster resources + ClusterNameLable = "tkestack.io/cluster-name" ) // KubeVendorType describe the kubernetes provider of the cluster diff --git a/cmd/tke-platform-controller/app/options/clustercontroller.go b/cmd/tke-platform-controller/app/options/clustercontroller.go index 58f8701af..6b13a0dd2 100644 --- a/cmd/tke-platform-controller/app/options/clustercontroller.go +++ b/cmd/tke-platform-controller/app/options/clustercontroller.go @@ -33,6 +33,7 @@ const ( flagUpperLimitofRandomHealthCheckPeriod = "upper-limit-random-healthcheck-period" flagClusterRateLimiterLimit = "cluster-rate-limiter-limit" flagClusterRateLimiterBurst = "cluster-rate-limiter-burst" + flagClusterIsCRDMode = "cluster-is-crd-mode" ) const ( @@ -43,6 +44,7 @@ const ( configUpperLimitofRandomHealthCheckPeriod = "controller.upper-limit-random-healthcheck-period" configClusterRateLimiterLimit = "controller.cluster_rate_limiter_limit" configClusterRateLimiterBurst = "controller.cluster_rate_limiter_burst" + configClusterIsCRDMode = "controller.cluster_is_crd_mode" ) // ClusterControllerOptions holds the ClusterController options. @@ -61,6 +63,7 @@ func NewClusterControllerOptions() *ClusterControllerOptions { RandomeRangeUpperLimitForHealthCheckPeriod: defaultRandomeRangeUpperLimitForHealthCheckPeriod, BucketRateLimiterLimit: defaultBucketRateLimiterLimit, BucketRateLimiterBurst: defaultBucketRateLimiterBurst, + IsCRDMode: false, }, } } @@ -91,6 +94,9 @@ func (o *ClusterControllerOptions) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&o.BucketRateLimiterBurst, flagClusterRateLimiterBurst, o.BucketRateLimiterBurst, "The number of bursts of at most b tokens.") _ = viper.BindPFlag(configClusterRateLimiterBurst, fs.Lookup(flagClusterRateLimiterBurst)) + + fs.BoolVar(&o.IsCRDMode, flagClusterIsCRDMode, o.IsCRDMode, "Whether the controller is using CRD mode") + _ = viper.BindPFlag(configClusterIsCRDMode, fs.Lookup(flagClusterIsCRDMode)) } // ApplyTo fills up ClusterController config with options. @@ -106,6 +112,7 @@ func (o *ClusterControllerOptions) ApplyTo(cfg *clusterconfig.ClusterControllerC cfg.RandomeRangeUpperLimitForHealthCheckPeriod = o.RandomeRangeUpperLimitForHealthCheckPeriod cfg.BucketRateLimiterLimit = o.BucketRateLimiterLimit cfg.BucketRateLimiterBurst = o.BucketRateLimiterBurst + cfg.IsCRDMode = o.IsCRDMode return nil } @@ -130,5 +137,6 @@ func (o *ClusterControllerOptions) ApplyFlags() []error { o.RandomeRangeUpperLimitForHealthCheckPeriod = viper.GetDuration(configUpperLimitofRandomHealthCheckPeriod) o.BucketRateLimiterLimit = viper.GetInt(configClusterRateLimiterLimit) o.BucketRateLimiterBurst = viper.GetInt(configClusterRateLimiterBurst) + o.IsCRDMode = viper.GetBool(configClusterIsCRDMode) return nil } diff --git a/pkg/controller/config/client.go b/pkg/controller/config/client.go index 1b6deec51..3df2b225f 100644 --- a/pkg/controller/config/client.go +++ b/pkg/controller/config/client.go @@ -20,6 +20,8 @@ package config import ( "fmt" + + "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "tkestack.io/tke/pkg/controller/options" @@ -29,9 +31,15 @@ import ( func BuildClientConfig(opts *options.APIServerClientOptions) (cfg *restclient.Config, ok bool, err error) { if opts.Required { if opts.Server == "" && opts.ServerClientConfig == "" { - err = fmt.Errorf("either %s or %s should be specified", - options.FlagAPIClientServer(opts.Name), - options.FlagAPIClientServerClientConfig(opts.Name)) + cfg, err = rest.InClusterConfig() + if err != nil { + err = fmt.Errorf("%s or %s is not specified, try to use in cluster config failed %v", + options.FlagAPIClientServer(opts.Name), + options.FlagAPIClientServerClientConfig(opts.Name), + err) + } else { + ok = true + } return } } diff --git a/pkg/controller/options/apiclient.go b/pkg/controller/options/apiclient.go index 8f722b061..52d752e7a 100644 --- a/pkg/controller/options/apiclient.go +++ b/pkg/controller/options/apiclient.go @@ -20,6 +20,7 @@ package options import ( "fmt" + "github.com/spf13/pflag" "github.com/spf13/viper" ) @@ -113,12 +114,6 @@ func (o *APIServerClientOptions) ApplyFlags() []error { o.QPS = float32(viper.GetFloat64(o.configAPIClientQPS)) o.ContentType = viper.GetString(o.configAPIClientContentType) - if o.Required { - if o.ServerClientConfig == "" && o.Server == "" { - errs = append(errs, fmt.Errorf("must specify either `%s` or `%s`", FlagAPIClientServer(o.Name), FlagAPIClientServerClientConfig(o.Name))) - } - } - return errs } diff --git a/pkg/platform/controller/cluster/cluster_controller.go b/pkg/platform/controller/cluster/cluster_controller.go index 0d568fc81..285fa7e81 100644 --- a/pkg/platform/controller/cluster/cluster_controller.go +++ b/pkg/platform/controller/cluster/cluster_controller.go @@ -71,6 +71,7 @@ type Controller struct { healthCheckPeriod time.Duration randomeRangeLowerLimitForHealthCheckPeriod time.Duration randomeRangeUpperLimitForHealthCheckPeriod time.Duration + isCRDMode bool } // NewController creates a new Controller object. @@ -88,6 +89,7 @@ func NewController( platformClient, finalizerToken, true), + isCRDMode: configuration.IsCRDMode, } rateLimit := workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), @@ -345,6 +347,10 @@ func (c *Controller) reconcile(ctx context.Context, key string, cluster *platfor var err error switch cluster.Status.Phase { + // empty string is for crd without mutating webhook + case "": + cluster.Status.Phase = platformv1.ClusterInitializing + err = c.onCreate(ctx, cluster) case platformv1.ClusterInitializing: err = c.onCreate(ctx, cluster) case platformv1.ClusterRunning, platformv1.ClusterFailed: @@ -389,7 +395,18 @@ func (c *Controller) onCreate(ctx context.Context, cluster *platformv1.Cluster) if err != nil { // Update status, ignore failure _, _ = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}) - _, _ = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) + updatedCls, _ := c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) + // if using crd, cluster status cannot be updated through update cluster + if c.isCRDMode { + var clsStatus *platformv1.Cluster + if updatedCls == nil { + clsStatus = clusterWrapper.Cluster + } else { + clsStatus = updatedCls + clsStatus.Status = clusterWrapper.Cluster.Status + } + _, _ = c.platformClient.Clusters().UpdateStatus(ctx, clsStatus, metav1.UpdateOptions{}) + } return err } clusterWrapper.ClusterCredential, err = c.platformClient.ClusterCredentials().Update(ctx, clusterWrapper.ClusterCredential, metav1.UpdateOptions{}) @@ -397,10 +414,18 @@ func (c *Controller) onCreate(ctx context.Context, cluster *platformv1.Cluster) return err } clusterWrapper.RegisterRestConfig(clusterWrapper.ClusterCredential.RESTConfig(cluster)) - clusterWrapper.Cluster, err = c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) + cls, err := c.platformClient.Clusters().Update(ctx, clusterWrapper.Cluster, metav1.UpdateOptions{}) if err != nil { return err } + // if using crd, cluster status cannot be updated through update cluster + if c.isCRDMode { + cls.Status = clusterWrapper.Cluster.Status + clusterWrapper.Cluster, err = c.platformClient.Clusters().UpdateStatus(ctx, cls, metav1.UpdateOptions{}) + if err != nil { + return err + } + } } return nil @@ -487,10 +512,20 @@ func (c *Controller) ensureCreateClusterCredential(ctx context.Context, cluster } // TODO use informer search by labels. - fieldSelector := fields.OneTermEqualSelector("clusterName", cluster.Name).String() - clustercredentials, err := c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) - if err != nil { - return nil, err + var clustercredentials *platformv1.ClusterCredentialList + var err error + if c.isCRDMode { + labelSelector := fields.OneTermEqualSelector(platformv1.ClusterNameLable, cluster.Name).String() + clustercredentials, err = c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return nil, err + } + } else { + fieldSelector := fields.OneTermEqualSelector("clusterName", cluster.Name).String() + clustercredentials, err = c.platformClient.ClusterCredentials().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + if err != nil { + return nil, err + } } // [Idempotent] if not found cluster credentials, create one for next logic @@ -500,6 +535,8 @@ func (c *Controller) ensureCreateClusterCredential(ctx context.Context, cluster TenantID: cluster.Spec.TenantID, ClusterName: cluster.Name, ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{platformv1.ClusterNameLable: cluster.Name}, + GenerateName: "cc-", OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(cluster, platformv1.SchemeGroupVersion.WithKind("Cluster"))}, }, diff --git a/pkg/platform/controller/cluster/config/types.go b/pkg/platform/controller/cluster/config/types.go index 16b8801e6..1fb67c08e 100644 --- a/pkg/platform/controller/cluster/config/types.go +++ b/pkg/platform/controller/cluster/config/types.go @@ -39,4 +39,6 @@ type ClusterControllerConfiguration struct { BucketRateLimiterLimit int // BucketRateLimiterBurst bursts of at most b tokens. BucketRateLimiterBurst int + // IsCRDMode Whether the controller is using CRD mode + IsCRDMode bool }