diff --git a/cmd/api/api.go b/cmd/api/api.go index c2c4f91342f5..421944218366 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -79,7 +79,7 @@ func NewAPICmd() *cobra.Command { func (c *command) start() (err error) { // Single kube client for whole lifetime of the API - c.client, err = kubeutil.NewClient(c.K0sVars.AdminKubeConfigPath) + c.client, err = kubeutil.NewClientFromFile(c.K0sVars.AdminKubeConfigPath) if err != nil { return err } diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 61cdde617bba..ce744c7bcdde 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -38,6 +38,7 @@ import ( "github.com/k0sproject/k0s/pkg/component/controller" "github.com/k0sproject/k0s/pkg/component/controller/clusterconfig" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" + "github.com/k0sproject/k0s/pkg/component/controller/workerconfig" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/component/status" @@ -429,7 +430,12 @@ func (c *command) start(ctx context.Context) error { c.ClusterComponents.Add(ctx, metrics) } - if !slices.Contains(c.DisableComponents, constant.KubeletConfigComponentName) { + if !slices.Contains(c.DisableComponents, constant.WorkerConfigComponentName) { + reconciler, err := workerconfig.NewReconciler(c.K0sVars, c.NodeConfig.Spec, adminClientFactory, leaderElector) + if err != nil { + return err + } + c.ClusterComponents.Add(ctx, reconciler) c.ClusterComponents.Add(ctx, controller.NewKubeletConfig(c.K0sVars, adminClientFactory)) } diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index dfb4834b0be8..80a66e21f128 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -31,7 +31,9 @@ import ( "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/component/status" "github.com/k0sproject/k0s/pkg/component/worker" + workerconfig "github.com/k0sproject/k0s/pkg/component/worker/config" "github.com/k0sproject/k0s/pkg/config" + "github.com/k0sproject/k0s/pkg/kubernetes" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -106,7 +108,12 @@ func (c *Command) Start(ctx context.Context) error { return err } - kubeletConfigClient, err := worker.LoadKubeletConfigClient(c.K0sVars) + kubeletKubeconfigPath := c.K0sVars.KubeletAuthConfigPath + workerConfig, err := workerconfig.LoadProfile( + ctx, + kubernetes.KubeconfigFromFile(kubeletKubeconfigPath), + c.WorkerProfile, + ) if err != nil { return err } @@ -132,9 +139,9 @@ func (c *Command) Start(ctx context.Context) error { CRISocket: c.CriSocket, EnableCloudProvider: c.CloudProvider, K0sVars: c.K0sVars, - KubeletConfigClient: kubeletConfigClient, + Kubeconfig: kubeletKubeconfigPath, + Configuration: *workerConfig.KubeletConfiguration.DeepCopy(), LogLevel: c.Logging["kubelet"], - Profile: c.WorkerProfile, Labels: c.Labels, Taints: c.Taints, ExtraArgs: c.KubeletExtraArgs, @@ -158,7 +165,7 @@ func (c *Command) Start(ctx context.Context) error { }) } - certManager := worker.NewCertificateManager(ctx, c.K0sVars.KubeletAuthConfigPath) + certManager := worker.NewCertificateManager(ctx, kubeletKubeconfigPath) if !c.SingleNode && !c.EnableWorker { clusterConfig, err := config.LoadClusterConfig(c.K0sVars) if err != nil { @@ -202,7 +209,7 @@ func (c *Command) Start(ctx context.Context) error { // Stop components if err := componentManager.Stop(); err != nil { - logrus.WithError(err).Error("error while stoping component manager") + logrus.WithError(err).Error("error while stopping component manager") } return nil } diff --git a/docs/configuration.md b/docs/configuration.md index aa692e1ee185..b2955f46fae0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -243,18 +243,23 @@ CALICO_IPV6POOL_CIDR: "{{ spec.network.dualStack.IPv6podCIDR }}" ### `spec.workerProfiles` -Worker profiles are used to set kubelet parameters can for a worker. Each worker profile is then used to generate a config map containing a custom `kubelet.config.k8s.io` object. +Worker profiles are used to manage worker-specific configuration in a +centralized manner. A ConfigMap is generated for each worker profile. Based on +the `--profile` argument given to the `k0s worker`, the configuration in the +corresponding ConfigMap is is picked up during startup. -For a list of possible kubelet configuration keys: [go here](https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/). +The worker profiles are defined as an array. Each element has following +properties: -The worker profiles are defined as an array of `spec.workerProfiles.workerProfile`. Each element has following properties: +| Property | Description | +| -------- | -------------------------------------------------------------------------------- | +| `name` | String; name to use as profile selector for the worker process | +| `values` | Object; [Kubelet configuration][kubelet-config] overrides, see below for details | -| Property | Description | -| -------- | -------------------------------------------------------------- | -| `name` | String; name to use as profile selector for the worker process | -| `values` | Mapping object | +#### `spec.workerProfiles[].values` (Kubelet configuration overrides) -For each profile, the control plane creates a separate ConfigMap with `kubelet-config yaml`. Based on the `--profile` argument given to the `k0s worker`, the corresponding ConfigMap is used to extract the `kubelet-config.yaml` file. `values` are recursively merged with default `kubelet-config.yaml` +The Kubelet configuration overrides of a profile override the defaults defined +by k0s. Note that there are several fields that cannot be overridden: @@ -264,6 +269,8 @@ Note that there are several fields that cannot be overridden: - `kind` - `staticPodURL` +[kubelet-config]: https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/ + #### Examples ##### Feature Gates @@ -403,21 +410,30 @@ spec: ## Disabling controller components -k0s allows completely disabling some of the system components. This allows the user to build a minimal Kubernetes control plane and use what ever components they need to fullfill their need for the controlplane. Disabling the system components happens through a commandline flag for the controller process: +k0s allows to completely disable some of the system components. This allows +users to build a minimal Kubernetes control plane and use what ever components +they need to fulfill their need for the control plane. Disabling the system +components happens through a command line flag for the controller process: ```sh ---disable-components strings disable components (valid items: api-config,autopilot,control-api,coredns,csr-approver,endpoint-reconciler,helm,konnectivity-server,kube-controller-manager,kube-proxy,kube-scheduler,kubelet-config,metrics-server,network-provider,node-role,system-rbac) - +--disable-components strings disable components (valid items: api-config,autopilot,control-api,coredns,csr-approver,endpoint-reconciler,helm,konnectivity-server,kube-controller-manager,kube-proxy,kube-scheduler,metrics-server,network-provider,node-role,system-rbac,worker-config) ``` -If you use k0sctl just add the flag when installing the cluster for the first controller at `spec.hosts.installFlags` in the config file like e.g.: +**Note:** As of k0s 1.26, the kubelet-config component has been replaced by the +worker-config component. k0s will issue a warning when the old component name is +being used. It is scheduled for removal in k0s 1.27. Please update to the new +component name. + +If you use k0sctl, just add the flag when installing the cluster for the first +controller at `spec.hosts.installFlags` in the config file like e.g.: ```yaml spec: hosts: - role: controller installFlags: - - --disable-components metrics-server + - --disable-components=metrics-server ``` -As seen from the component list, the only always-on component is the Kubernetes api-server, without that k0s serves no purpose. +As seen from the component list, the only always-on component is the Kubernetes +api-server, without that k0s serves no purpose. diff --git a/docs/worker-node-config.md b/docs/worker-node-config.md index c7ae44a57afc..05316bf9734a 100644 --- a/docs/worker-node-config.md +++ b/docs/worker-node-config.md @@ -46,17 +46,25 @@ controller0 [map[effect:NoSchedule key:node-role.kubernetes.io/master]] worker0 ``` -## Kubelet args +## Kubelet configuration -The `k0s worker` command accepts a generic flag to pass in any set of arguments for kubelet process. +The `k0s worker` command accepts a generic flag to pass in any set of arguments +for kubelet process. -For example, running `k0s worker --token-file=k0s.token --kubelet-extra-args="--node-ip=1.2.3.4 --address=0.0.0.0"` passes in the given flags to kubelet as-is. As such, you must confirm that any flags you are passing in are properly formatted and valued as k0s will not validate those flags. +For example, running `k0s worker --token-file=k0s.token +--kubelet-extra-args="--node-ip=1.2.3.4 --address=0.0.0.0"` passes in the given +flags to Kubelet as-is. As such, you must confirm that any flags you are passing +in are properly formatted and valued as k0s will not validate those flags. -## Worker Profiles +### Worker Profiles -kubelet parameters can also be set via a worker profile. Worker profiles are defined in the main k0s.yaml and are used to generate a config map containing a custom `kubelet.config.k8s.io` object. -To see examples of k0s.yaml containing worker profiles: [go here](./configuration.md#specworkerprofiles). -For a list of possible kubelet configuration keys: [go here](https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/). +Kubelet configuration fields can also be set via a worker profiles. Worker +profiles are defined in the main k0s.yaml and are used to generate ConfigMaps +containing a custom `kubelet.config.k8s.io/v1beta1/KubeletConfiguration` object. +To see examples of k0s.yaml containing worker profiles: [go +here](./configuration.md#specworkerprofiles). For a list of possible Kubelet +configuration fields: [go +here](https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/). ## IPTables Mode diff --git a/inttest/disabledcomponents/disabled_components_test.go b/inttest/disabledcomponents/disabled_components_test.go index f4c9ff7fae5b..e2a432cad57a 100644 --- a/inttest/disabledcomponents/disabled_components_test.go +++ b/inttest/disabledcomponents/disabled_components_test.go @@ -32,7 +32,7 @@ type DisabledComponentsSuite struct { func (s *DisabledComponentsSuite) TestK0sGetsUp() { - s.NoError(s.InitController(0, "--disable-components konnectivity-server,kube-scheduler,kube-controller-manager,control-api,csr-approver,kube-proxy,coredns,network-provider,helm,metrics-server,kubelet-config,system-rbac")) + s.NoError(s.InitController(0, "--disable-components control-api,coredns,csr-approver,helm,konnectivity-server,kube-controller-manager,kube-proxy,kube-scheduler,metrics-server,network-provider,system-rbac,worker-config")) kc, err := s.KubeClient(s.ControllerNode(0)) s.Require().NoError(err) diff --git a/pkg/component/controller/clusterConfig.go b/pkg/component/controller/clusterConfig.go index 8702dbc0bfcb..45f9a116431e 100644 --- a/pkg/component/controller/clusterConfig.go +++ b/pkg/component/controller/clusterConfig.go @@ -22,7 +22,7 @@ import ( "os" "time" - cfgClient "github.com/k0sproject/k0s/pkg/apis/k0s.k0sproject.io/clientset/typed/k0s.k0sproject.io/v1beta1" + k0sclient "github.com/k0sproject/k0s/pkg/apis/k0s.k0sproject.io/clientset/typed/k0s.k0sproject.io/v1beta1" "github.com/k0sproject/k0s/pkg/apis/k0s.k0sproject.io/v1beta1" "github.com/k0sproject/k0s/pkg/component/controller/clusterconfig" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" @@ -47,8 +47,7 @@ type ClusterConfigReconciler struct { ComponentManager *manager.Manager KubeClientFactory kubeutil.ClientFactoryInterface - configClient cfgClient.ClusterConfigInterface - kubeConfig string + configClient k0sclient.ClusterConfigInterface leaderElector leaderelector.Interface log *logrus.Entry saver manifestsSaver @@ -72,7 +71,6 @@ func NewClusterConfigReconciler(leaderElector leaderelector.Interface, k0sVars c ComponentManager: mgr, YamlConfig: cfg, KubeClientFactory: kubeClientFactory, - kubeConfig: k0sVars.AdminKubeConfigPath, leaderElector: leaderElector, log: logrus.WithFields(logrus.Fields{"component": "clusterConfig-reconciler"}), saver: s, diff --git a/pkg/component/controller/kubeletconfig.go b/pkg/component/controller/kubeletconfig.go index 50746b36aec3..ae700bbe65a3 100644 --- a/pkg/component/controller/kubeletconfig.go +++ b/pkg/component/controller/kubeletconfig.go @@ -180,6 +180,15 @@ func (k *KubeletConfig) save(data []byte) error { if err := file.WriteContentAtomically(filePath, data, constant.CertMode); err != nil { return fmt.Errorf("can't write kubelet configuration config map: %v", err) } + + deprecationNotice := []byte(`The kubelet-config component has been replaced by the worker-config component in k0s 1.26. +It is scheduled for removal in k0s 1.27. +`) + + if err := file.WriteContentAtomically(filepath.Join(kubeletDir, "deprecated.txt"), deprecationNotice, constant.CertMode); err != nil { + k.log.WithError(err).Warn("Failed to write deprecation notice") + } + return nil } @@ -257,6 +266,12 @@ kind: ConfigMap metadata: name: {{.Name}} namespace: kube-system + labels: + k0s.k0sproject.io/deprecated-since: "1.26" + annotations: + k0s.k0sproject.io/deprecated: | + The kubelet-config component has been replaced by the worker-config component in k0s 1.26. + It is scheduled for removal in k0s 1.27. data: kubelet: | {{ .KubeletConfigYAML | nindent 4 }} @@ -268,6 +283,12 @@ kind: Role metadata: name: system:bootstrappers:kubelet-configmaps namespace: kube-system + labels: + k0s.k0sproject.io/deprecated-since: "1.26" + annotations: + k0s.k0sproject.io/deprecated: | + The kubelet-config component has been replaced by the worker-config component in k0s 1.26. + It is scheduled for removal in k0s 1.27. rules: - apiGroups: [""] resources: ["configmaps"] @@ -282,6 +303,12 @@ kind: RoleBinding metadata: name: system:bootstrappers:kubelet-configmaps namespace: kube-system + labels: + k0s.k0sproject.io/deprecated-since: "1.26" + annotations: + k0s.k0sproject.io/deprecated: | + The kubelet-config component has been replaced by the worker-config component in k0s 1.26. + It is scheduled for removal in k0s 1.27. roleRef: apiGroup: rbac.authorization.k8s.io kind: Role diff --git a/pkg/component/controller/kubeletconfig_test.go b/pkg/component/controller/kubeletconfig_test.go index 59c369a9f8e4..7644af3ca64e 100644 --- a/pkg/component/controller/kubeletconfig_test.go +++ b/pkg/component/controller/kubeletconfig_test.go @@ -30,9 +30,8 @@ import ( "github.com/stretchr/testify/require" ) -var k0sVars = constant.GetConfig("") - func Test_KubeletConfig(t *testing.T) { + k0sVars := constant.GetConfig(t.TempDir()) dnsAddr, _ := cfg.Spec.Network.DNSAddress() t.Run("default_profile_only", func(t *testing.T) { k := NewKubeletConfig(k0sVars, testutil.NewFakeClientFactory()) @@ -118,6 +117,7 @@ func Test_KubeletConfig(t *testing.T) { } func defaultConfigWithUserProvidedProfiles(t *testing.T) *KubeletConfig { + k0sVars := constant.GetConfig(t.TempDir()) k := NewKubeletConfig(k0sVars, testutil.NewFakeClientFactory()) cfgProfileX := map[string]interface{}{ diff --git a/pkg/component/controller/workerconfig/reconciler.go b/pkg/component/controller/workerconfig/reconciler.go new file mode 100644 index 000000000000..18ff3e18203b --- /dev/null +++ b/pkg/component/controller/workerconfig/reconciler.go @@ -0,0 +1,535 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workerconfig + +import ( + "context" + "errors" + "fmt" + "net" + "reflect" + "sort" + "strings" + "sync" + "time" + + "github.com/k0sproject/k0s/pkg/apis/k0s.k0sproject.io/v1beta1" + "github.com/k0sproject/k0s/pkg/applier" + "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" + "github.com/k0sproject/k0s/pkg/component/manager" + workerconfig "github.com/k0sproject/k0s/pkg/component/worker/config" + "github.com/k0sproject/k0s/pkg/constant" + kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" + "k8s.io/utils/pointer" + + "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" + "sigs.k8s.io/yaml" +) + +type resources = []*unstructured.Unstructured + +// Reconciler maintains ConfigMaps that hold configuration to be +// used on k0s worker nodes, depending on their selected worker profile. +type Reconciler struct { + log logrus.FieldLogger + + clusterDomain string + clusterDNSIP net.IP + clientFactory kubeutil.ClientFactoryInterface + leaderElector leaderelector.Interface + + mu sync.Mutex + state reconcilerState + + // valid when initialized + apply func(context.Context, resources) error + + // valid when started + updates chan<- updateFunc + requestStop func() + stopped <-chan struct{} +} + +var ( + _ manager.Component = (*Reconciler)(nil) + _ manager.Reconciler = (*Reconciler)(nil) +) + +type reconcilerState string + +var ( + reconcilerCreated reconcilerState = "created" + reconcilerInitialized reconcilerState = "initialized" + reconcilerStarted reconcilerState = "started" + reconcilerStopped reconcilerState = "stopped" +) + +// NewReconciler creates a new reconciler for worker configurations. +func NewReconciler(k0sVars constant.CfgVars, nodeSpec *v1beta1.ClusterSpec, clientFactory kubeutil.ClientFactoryInterface, leaderElector leaderelector.Interface) (*Reconciler, error) { + log := logrus.WithFields(logrus.Fields{"component": "workerconfig.Reconciler"}) + + clusterDNSIPString, err := nodeSpec.Network.DNSAddress() + if err != nil { + return nil, err + } + clusterDNSIP := net.ParseIP(clusterDNSIPString) + if clusterDNSIP == nil { + return nil, fmt.Errorf("not an IP address: %q", clusterDNSIPString) + } + + reconciler := &Reconciler{ + log: log, + + clusterDomain: nodeSpec.Network.ClusterDomain, + clusterDNSIP: clusterDNSIP, + clientFactory: clientFactory, + leaderElector: leaderElector, + + state: reconcilerCreated, + } + + return reconciler, nil +} + +// Init implements [manager.Component]. +func (r *Reconciler) Init(context.Context) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.state != reconcilerCreated { + return fmt.Errorf("cannot initialize, not created: %s", r.state) + } + + clientFactory := r.clientFactory + apply := func(ctx context.Context, resources resources) error { + dynamicClient, err := clientFactory.GetDynamicClient() + if err != nil { + return err + } + discoveryClient, err := clientFactory.GetDiscoveryClient() + if err != nil { + return err + } + + return (&applier.Stack{ + Name: "k0s-" + constant.WorkerConfigComponentName, + Client: dynamicClient, + Discovery: discoveryClient, + Resources: resources, + }).Apply(ctx, true) + } + + r.apply = apply + r.state = reconcilerInitialized + + return nil +} + +type updateFunc = func(*snapshot) chan<- error + +// Start implements [manager.Component]. +func (r *Reconciler) Start(context.Context) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.state != reconcilerInitialized { + return fmt.Errorf("cannot start, not initialized: %s", r.state) + } + + // Setup the updates channel. Updates may be sent via the reconcile() + // method. The reconciliation goroutine will pick them up for processing. + updates := make(chan updateFunc, 1) + + // Setup the reconciliation goroutine. It will read the state changes from + // the update channel and apply those to the desired state. Changes will be + // applied whenever the last reconciled state differs from the desired + // state. + reconcilerCtx, cancelReconciler := context.WithCancel(context.Background()) + stopped := make(chan struct{}) + apply := r.apply + go func() { + defer close(stopped) + defer r.log.Info("Reconciliation loop done") + r.log.Info("Starting reconciliation loop") + r.runReconcileLoop(reconcilerCtx, updates, apply) + }() + + // React to leader elector changes. Enforce a reconciliation whenever the + // lease is acquired. + r.leaderElector.AddAcquiredLeaseCallback(func() { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + err := reconcile(ctx, updates, stopped, func(s *snapshot) { + s.serial++ + }) + + // Log any reconciliation errors, but only if they don't indicate + // that the reconciler has been stopped concurrently. + if err != nil && !errors.Is(err, errStoppedConcurrently) { + r.log.WithError(err).Error("Failed to reconcile after having acquired the leader lease") + } + }() + }) + + // Store the started state + r.apply = nil + r.updates = updates + r.requestStop = cancelReconciler + r.stopped = stopped + r.state = reconcilerStarted + + return nil +} + +// runReconcileLoop executes the main reconciliation loop. The loop will exit +// when the context is done. +// +// The loop works as follows: +// - Receive any updates from the channel +// - Apply the updates to the desired state +// - Compare the desired state to the last reconciled state +// - Reconcile the desired state +// - Send back the outcome of the reconciliation to the channel provided by +// the update function +// +// Reconciliation may be skipped if: +// - The desired state hasn't been fully collected yet +// - The leader lease isn't acquired +// - The last applied state is identical to the desired state +// +// Any failed reconciliations will be retried roughly every minute, until they +// succeed. +func (r *Reconciler) runReconcileLoop(ctx context.Context, updates <-chan updateFunc, apply func(context.Context, resources) error) { + var desiredState, reconciledState snapshot + + runReconciliation := func() error { + if err := ctx.Err(); err != nil { + return fmt.Errorf("%w while processing reconciliation", errStoppedConcurrently) + } + + if !r.leaderElector.IsLeader() { + r.log.Debug("Skipping reconciliation, not the leader") + return nil + } + + if desiredState.configSnapshot == nil { + r.log.Debug("Skipping reconciliation, snapshot not yet complete") + return nil + } + + if reflect.DeepEqual(&reconciledState, &desiredState) { + r.log.Debug("Skipping reconciliation, nothing changed") + return nil + } + + stateToReconcile := desiredState.DeepCopy() + resources, err := r.generateResources(stateToReconcile) + if err != nil { + return fmt.Errorf("failed to generate resources for worker configuration: %w", err) + } + + r.log.Debug("Updating worker configuration ...") + + err = apply(ctx, resources) + if err != nil { + return fmt.Errorf("failed to apply resources for worker configuration: %w", err) + } + + stateToReconcile.DeepCopyInto(&reconciledState) + + r.log.Info("Worker configuration updated") + return nil + } + + retryTicker := time.NewTicker(60 * time.Second) + defer retryTicker.Stop() + + var lastRecoFailed bool + + for { + select { + case update := <-updates: + done := update(&desiredState) + func() { + defer close(done) + err := runReconciliation() + done <- err + lastRecoFailed = err != nil + }() + + case <-ctx.Done(): + return // stop requested + + case <-retryTicker.C: // Retry failed reconciliations every minute + if lastRecoFailed { + if err := runReconciliation(); err != nil { + r.log.WithError(err).Error("Failed to recover from previously failed reconciliation") + continue + } + + lastRecoFailed = false + } + } + } +} + +// Reconcile implements [manager.Reconciler]. +func (r *Reconciler) Reconcile(ctx context.Context, cluster *v1beta1.ClusterConfig) error { + updates, stopped, err := func() (chan<- updateFunc, <-chan struct{}, error) { + r.mu.Lock() + defer r.mu.Unlock() + if r.state != reconcilerStarted { + return nil, nil, fmt.Errorf("cannot reconcile, not started: %s", r.state) + } + return r.updates, r.stopped, nil + }() + if err != nil { + return err + } + + configSnapshot := takeConfigSnapshot(cluster.Spec) + + return reconcile(ctx, updates, stopped, func(s *snapshot) { + s.configSnapshot = &configSnapshot + }) +} + +var errStoppedConcurrently = errors.New("stopped concurrently") + +// reconcile enqueues the given update and awaits its reconciliation. +func reconcile(ctx context.Context, updates chan<- updateFunc, stopped <-chan struct{}, update func(*snapshot)) error { + recoDone := make(chan error, 1) + + select { + case updates <- func(s *snapshot) chan<- error { update(s); return recoDone }: + break + case <-stopped: + return fmt.Errorf("%w while trying to enqueue state update", errStoppedConcurrently) + case <-ctx.Done(): + return fmt.Errorf("%w while trying to enqueue state update", ctx.Err()) + } + + select { + case err := <-recoDone: + return err + case <-stopped: + return fmt.Errorf("%w while waiting for reconciliation to finish", errStoppedConcurrently) + case <-ctx.Done(): + return fmt.Errorf("%w while waiting for reconciliation to finish", ctx.Err()) + } +} + +// Stop implements [manager.Component]. +func (r *Reconciler) Stop() error { + r.log.Debug("Stopping") + + stopped, err := func() (<-chan struct{}, error) { + r.mu.Lock() + defer r.mu.Unlock() + + switch r.state { + case reconcilerStarted: + go r.requestStop() + r.updates = nil + r.requestStop = nil + r.state = reconcilerStopped + return r.stopped, nil + + case reconcilerStopped: + return r.stopped, nil + + default: + return nil, fmt.Errorf("cannot stop: %s", r.state) + } + }() + if err != nil { + return err + } + + <-stopped + r.log.Info("Stopped") + return nil +} + +type resource interface { + runtime.Object + metav1.Object +} + +func (r *Reconciler) generateResources(snapshot *snapshot) (resources, error) { + configMaps, err := r.buildConfigMaps(snapshot) + if err != nil { + return nil, err + } + + objects := buildRBACResources(configMaps) + for _, configMap := range configMaps { + objects = append(objects, configMap) + } + + // Ensure a stable order, so that reflect.DeepEqual on slices will work. + slices.SortFunc(objects, func(l, r resource) bool { + x := strings.Join([]string{l.GetObjectKind().GroupVersionKind().Kind, l.GetNamespace(), l.GetName()}, "/") + y := strings.Join([]string{r.GetObjectKind().GroupVersionKind().Kind, r.GetNamespace(), r.GetName()}, "/") + return x < y + }) + + resources, err := applier.ToUnstructuredSlice(nil, objects...) + if err != nil { + return nil, err + } + + return resources, nil +} + +func (r *Reconciler) buildConfigMaps(snapshot *snapshot) ([]*corev1.ConfigMap, error) { + workerProfiles := make(map[string]*workerconfig.Profile) + + workerProfile := r.buildProfile(snapshot) + workerProfile.KubeletConfiguration.CgroupsPerQOS = pointer.Bool(true) + workerProfiles["default"] = workerProfile + + workerProfile = r.buildProfile(snapshot) + workerProfile.KubeletConfiguration.CgroupsPerQOS = pointer.Bool(false) + workerProfiles["default-windows"] = workerProfile + + for _, profile := range snapshot.profiles { + workerProfile, ok := workerProfiles[profile.Name] + if !ok { + workerProfile = r.buildProfile(snapshot) + } + if err := yaml.Unmarshal(profile.Config, &workerProfile.KubeletConfiguration); err != nil { + return nil, fmt.Errorf("failed to decode worker profile %q: %w", profile.Name, err) + } + workerProfiles[profile.Name] = workerProfile + } + + var configMaps []*corev1.ConfigMap + for name, workerProfile := range workerProfiles { + configMap, err := toConfigMap(name, workerProfile) + if err != nil { + return nil, fmt.Errorf("failed to generate ConfigMap for worker profile %q: %w", name, err) + } + configMaps = append(configMaps, configMap) + } + + return configMaps, nil +} + +func buildRBACResources(configMaps []*corev1.ConfigMap) []resource { + configMapNames := make([]string, len(configMaps)) + for i, configMap := range configMaps { + configMapNames[i] = configMap.ObjectMeta.Name + } + + // Not strictly necessary, but it guarantees a stable ordering. + sort.Strings(configMapNames) + + meta := metav1.ObjectMeta{ + Name: fmt.Sprintf("system:bootstrappers:%s", constant.WorkerConfigComponentName), + Namespace: "kube-system", + Labels: applier.CommonLabels(constant.WorkerConfigComponentName), + } + + var objects []resource + objects = append(objects, &rbacv1.Role{ + ObjectMeta: meta, + Rules: []rbacv1.PolicyRule{{ + APIGroups: []string{""}, + Resources: []string{"configmaps"}, + Verbs: []string{"get"}, + ResourceNames: configMapNames, + }}, + }) + + objects = append(objects, &rbacv1.RoleBinding{ + ObjectMeta: meta, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.GroupName, + Kind: "Role", + Name: meta.Name, + }, + Subjects: []rbacv1.Subject{{ + APIGroup: rbacv1.GroupName, + Kind: rbacv1.GroupKind, + Name: "system:bootstrappers", + }, { + APIGroup: rbacv1.GroupName, + Kind: rbacv1.GroupKind, + Name: "system:nodes", + }}, + }) + + return objects +} + +func (r *Reconciler) buildProfile(snapshot *snapshot) *workerconfig.Profile { + workerProfile := &workerconfig.Profile{ + KubeletConfiguration: kubeletv1beta1.KubeletConfiguration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: kubeletv1beta1.SchemeGroupVersion.String(), + Kind: "KubeletConfiguration", + }, + ClusterDNS: []string{r.clusterDNSIP.String()}, + ClusterDomain: r.clusterDomain, + TLSCipherSuites: []string{ + "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", + "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", + "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", + "TLS_RSA_WITH_AES_128_GCM_SHA256", + "TLS_RSA_WITH_AES_256_GCM_SHA384", + }, + FailSwapOn: pointer.Bool(false), + RotateCertificates: true, + ServerTLSBootstrap: true, + EventRecordQPS: pointer.Int32(0), + }, + } + + return workerProfile +} + +func toConfigMap(profileName string, profile *workerconfig.Profile) (*corev1.ConfigMap, error) { + data, err := workerconfig.ToConfigMapData(profile) + if err != nil { + return nil, err + } + + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%s", constant.WorkerConfigComponentName, profileName, constant.KubernetesMajorMinorVersion), + Namespace: "kube-system", + Labels: applier. + CommonLabels(constant.WorkerConfigComponentName). + With("k0s.k0sproject.io/worker-profile", profileName), + }, + Data: data, + }, nil +} diff --git a/pkg/component/controller/workerconfig/reconciler_test.go b/pkg/component/controller/workerconfig/reconciler_test.go new file mode 100644 index 000000000000..ac301dce75f3 --- /dev/null +++ b/pkg/component/controller/workerconfig/reconciler_test.go @@ -0,0 +1,790 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workerconfig + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/k0sproject/k0s/internal/testutil" + "github.com/k0sproject/k0s/pkg/apis/k0s.k0sproject.io/v1beta1" + "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" + "github.com/k0sproject/k0s/pkg/constant" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" + "k8s.io/utils/pointer" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/yaml" +) + +type kubeletConfig = kubeletv1beta1.KubeletConfiguration + +func TestReconciler_Lifecycle(t *testing.T) { + createdReconciler := func(t *testing.T) *Reconciler { + t.Helper() + underTest, err := NewReconciler( + constant.GetConfig(t.TempDir()), + &v1beta1.ClusterSpec{ + API: &v1beta1.APISpec{}, + Network: &v1beta1.Network{ + ClusterDomain: "test.local", + ServiceCIDR: "99.99.99.0/24", + }, + }, + testutil.NewFakeClientFactory(), + &leaderelector.Dummy{Leader: true}, + ) + require.NoError(t, err) + underTest.log = newTestLogger(t) + return underTest + } + + t.Run("when_created", func(t *testing.T) { + + t.Run("init_succeeds", func(t *testing.T) { + underTest := createdReconciler(t) + + err := underTest.Init(testContext(t)) + + assert.NoError(t, err) + }) + + t.Run("start_fails", func(t *testing.T) { + underTest := createdReconciler(t) + + err := underTest.Start(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot start, not initialized: created", err.Error()) + }) + + t.Run("reconcile_fails", func(t *testing.T) { + underTest := createdReconciler(t) + + err := underTest.Reconcile(testContext(t), v1beta1.DefaultClusterConfig(nil)) + + require.Error(t, err) + assert.Equal(t, "cannot reconcile, not started: created", err.Error()) + }) + + t.Run("stop_fails", func(t *testing.T) { + underTest := createdReconciler(t) + + err := underTest.Stop() + + require.Error(t, err) + assert.Equal(t, "cannot stop: created", err.Error()) + }) + }) + + initializedReconciler := func(t *testing.T) *Reconciler { + t.Helper() + underTest := createdReconciler(t) + require.NoError(t, underTest.Init(testContext(t))) + return underTest + } + + t.Run("when_initialized", func(t *testing.T) { + + t.Run("another_init_fails", func(t *testing.T) { + underTest := initializedReconciler(t) + + err := underTest.Init(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot initialize, not created: initialized", err.Error()) + }) + + t.Run("start_and_stop_succeed", func(t *testing.T) { + underTest := initializedReconciler(t) + + require.NoError(t, underTest.Start(testContext(t))) + assert.NoError(t, underTest.Stop()) + }) + + t.Run("reconcile_fails", func(t *testing.T) { + underTest := initializedReconciler(t) + + err := underTest.Reconcile(testContext(t), v1beta1.DefaultClusterConfig(nil)) + + require.Error(t, err) + assert.Equal(t, "cannot reconcile, not started: initialized", err.Error()) + }) + + t.Run("stop_fails", func(t *testing.T) { + underTest := initializedReconciler(t) + + err := underTest.Stop() + + require.Error(t, err) + assert.Equal(t, "cannot stop: initialized", err.Error()) + }) + }) + + startedReconciler := func(t *testing.T) (*Reconciler, *mockApplier) { + t.Helper() + underTest := initializedReconciler(t) + mockApplier := installMockApplier(t, underTest) + require.NoError(t, underTest.Start(testContext(t))) + t.Cleanup(func() { + err := underTest.Stop() + if !t.Failed() { + assert.NoError(t, err) + } + }) + return underTest, mockApplier + } + + t.Run("when_started", func(t *testing.T) { + + t.Run("init_fails", func(t *testing.T) { + underTest, _ := startedReconciler(t) + + err := underTest.Init(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot initialize, not created: started", err.Error()) + }) + + t.Run("another_start_fails", func(t *testing.T) { + underTest, _ := startedReconciler(t) + + err := underTest.Start(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot start, not initialized: started", err.Error()) + }) + + t.Run("reconcile_succeeds", func(t *testing.T) { + underTest, mockApplier := startedReconciler(t) + applied := mockApplier.expectApply(t, nil) + + assert.NoError(t, underTest.Reconcile(testContext(t), v1beta1.DefaultClusterConfig(nil))) + + assert.NotEmpty(t, applied(), "Expected some resources to be applied") + }) + + t.Run("stop_succeeds", func(t *testing.T) { + underTest, _ := startedReconciler(t) + + err := underTest.Stop() + + assert.NoError(t, err) + }) + }) + + reconciledReconciler := func(t *testing.T) (*Reconciler, *mockApplier) { + t.Helper() + underTest, mockApplier := startedReconciler(t) + applied := mockApplier.expectApply(t, nil) + require.NoError(t, underTest.Reconcile(testContext(t), v1beta1.DefaultClusterConfig(nil))) + + _ = applied() // wait until reconciliation happened + return underTest, mockApplier + } + + t.Run("when_reconciled", func(t *testing.T) { + t.Run("init_fails", func(t *testing.T) { + underTest, _ := reconciledReconciler(t) + + err := underTest.Init(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot initialize, not created: started", err.Error()) + }) + + t.Run("start_fails", func(t *testing.T) { + underTest, _ := reconciledReconciler(t) + + err := underTest.Start(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot start, not initialized: started", err.Error()) + }) + + t.Run("another_reconcile_succeeds", func(t *testing.T) { + underTest, mockApplier := reconciledReconciler(t) + applied := mockApplier.expectApply(t, nil) + clusterConfig := v1beta1.DefaultClusterConfig(nil) + clusterConfig.Spec.WorkerProfiles = v1beta1.WorkerProfiles{ + {Name: "foo", Config: json.RawMessage("{}")}, + } + + assert.NoError(t, underTest.Reconcile(testContext(t), clusterConfig)) + + assert.NotEmpty(t, applied(), "Expected some resources to be applied") + }) + + t.Run("stop_succeeds", func(t *testing.T) { + underTest, _ := reconciledReconciler(t) + + err := underTest.Stop() + + assert.NoError(t, err) + }) + }) + + stoppedReconciler := func(t *testing.T) *Reconciler { + t.Helper() + underTest, _ := reconciledReconciler(t) + require.NoError(t, underTest.Stop()) + return underTest + } + + t.Run("when_stopped", func(t *testing.T) { + + t.Run("init_fails", func(t *testing.T) { + underTest := stoppedReconciler(t) + + err := underTest.Init(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot initialize, not created: stopped", err.Error()) + }) + + t.Run("start_fails", func(t *testing.T) { + underTest := stoppedReconciler(t) + + err := underTest.Start(testContext(t)) + + require.Error(t, err) + assert.Equal(t, "cannot start, not initialized: stopped", err.Error()) + }) + + t.Run("reconcile_fails", func(t *testing.T) { + underTest := stoppedReconciler(t) + + err := underTest.Reconcile(testContext(t), v1beta1.DefaultClusterConfig(nil)) + + require.Error(t, err) + assert.Equal(t, "cannot reconcile, not started: stopped", err.Error()) + }) + + t.Run("stop_succeeds", func(t *testing.T) { + underTest := stoppedReconciler(t) + + err := underTest.Stop() + + assert.NoError(t, err) + }) + }) +} + +func TestReconciler_ResourceGeneration(t *testing.T) { + underTest, err := NewReconciler( + constant.GetConfig(t.TempDir()), + &v1beta1.ClusterSpec{ + API: &v1beta1.APISpec{}, + Network: &v1beta1.Network{ + ClusterDomain: "test.local", + ServiceCIDR: "99.99.99.0/24", + }, + }, + testutil.NewFakeClientFactory(), + &leaderelector.Dummy{Leader: true}, + ) + require.NoError(t, err) + underTest.log = newTestLogger(t) + + require.NoError(t, underTest.Init(context.TODO())) + + mockApplier := installMockApplier(t, underTest) + + require.NoError(t, underTest.Start(context.TODO())) + t.Cleanup(func() { + assert.NoError(t, underTest.Stop()) + }) + + applied := mockApplier.expectApply(t, nil) + + require.NoError(t, underTest.Reconcile(context.TODO(), &v1beta1.ClusterConfig{ + Spec: &v1beta1.ClusterSpec{ + WorkerProfiles: v1beta1.WorkerProfiles{{ + Name: "profile_XXX", + Config: []byte(`{"authentication": {"anonymous": {"enabled": true}}}`), + }, { + Name: "profile_YYY", + Config: []byte(`{"authentication": {"webhook": {"cacheTTL": "15s"}}}`), + }}, + }, + })) + + configMaps := map[string]func(t *testing.T, expected *kubeletConfig){ + "worker-config-default-1.26": func(t *testing.T, expected *kubeletConfig) { + expected.CgroupsPerQOS = pointer.Bool(true) + }, + + "worker-config-default-windows-1.26": func(t *testing.T, expected *kubeletConfig) { + expected.CgroupsPerQOS = pointer.Bool(false) + }, + + "worker-config-profile_XXX-1.26": func(t *testing.T, expected *kubeletConfig) { + expected.Authentication.Anonymous.Enabled = pointer.Bool(true) + }, + + "worker-config-profile_YYY-1.26": func(t *testing.T, expected *kubeletConfig) { + expected.Authentication.Webhook.CacheTTL = metav1.Duration{Duration: 15 * time.Second} + }, + } + + appliedResources := applied() + assert.Len(t, appliedResources, len(configMaps)+2) + + for name, mod := range configMaps { + t.Run(name, func(t *testing.T) { + kubelet := requireKubelet(t, appliedResources, name) + expected := makeKubeletConfig(t, func(expected *kubeletConfig) { mod(t, expected) }) + assert.JSONEq(t, expected, kubelet) + }) + } + + const rbacName = "system:bootstrappers:worker-config" + + t.Run("Role", func(t *testing.T) { + role := findResource(t, "Expected to find a Role named "+rbacName, + appliedResources, func(resource *unstructured.Unstructured) bool { + return resource.GetKind() == "Role" && resource.GetName() == rbacName + }, + ) + + rules, ok, err := unstructured.NestedSlice(role.Object, "rules") + require.NoError(t, err) + require.True(t, ok, "No rules field") + require.Len(t, rules, 1, "Expected a single rule") + + rule, ok := rules[0].(map[string]any) + require.True(t, ok, "Invalid rule") + + resourceNames, ok, err := unstructured.NestedStringSlice(rule, "resourceNames") + require.NoError(t, err) + require.True(t, ok, "No resourceNames field") + + assert.Len(t, resourceNames, len(configMaps)) + for expected := range configMaps { + assert.Contains(t, resourceNames, expected) + } + }) + + t.Run("RoleBinding", func(t *testing.T) { + binding := findResource(t, "Expected to find a RoleBinding named "+rbacName, + appliedResources, func(resource *unstructured.Unstructured) bool { + return resource.GetKind() == "RoleBinding" && resource.GetName() == rbacName + }, + ) + + roleRef, ok, err := unstructured.NestedMap(binding.Object, "roleRef") + if assert.NoError(t, err) && assert.True(t, ok, "No roleRef field") { + expected := map[string]any{ + "apiGroup": "rbac.authorization.k8s.io", + "kind": "Role", + "name": rbacName, + } + + assert.Equal(t, expected, roleRef) + } + + subjects, ok, err := unstructured.NestedSlice(binding.Object, "subjects") + if assert.NoError(t, err) && assert.True(t, ok, "No subjects field") { + expected := []any{map[string]any{ + "apiGroup": "rbac.authorization.k8s.io", + "kind": "Group", + "name": "system:bootstrappers", + }, map[string]any{ + "apiGroup": "rbac.authorization.k8s.io", + "kind": "Group", + "name": "system:nodes", + }} + + assert.Equal(t, expected, subjects) + } + }) +} + +func TestReconciler_ReconcilesOnChangesOnly(t *testing.T) { + cluster := v1beta1.DefaultClusterConfig(nil) + underTest, err := NewReconciler( + constant.GetConfig(t.TempDir()), + &v1beta1.ClusterSpec{ + API: &v1beta1.APISpec{}, + Network: &v1beta1.Network{ + ClusterDomain: "test.local", + ServiceCIDR: "99.99.99.0/24", + }, + }, + testutil.NewFakeClientFactory(), + &leaderelector.Dummy{Leader: true}, + ) + require.NoError(t, err) + underTest.log = newTestLogger(t) + + require.NoError(t, underTest.Init(context.TODO())) + + mockApplier := installMockApplier(t, underTest) + + require.NoError(t, underTest.Start(context.TODO())) + t.Cleanup(func() { + assert.NoError(t, underTest.Stop()) + }) + + expectApply := func(t *testing.T) { + t.Helper() + applied := mockApplier.expectApply(t, nil) + assert.NoError(t, underTest.Reconcile(context.TODO(), cluster)) + appliedResources := applied() + assert.NotEmpty(t, applied, "Expected some resources to be applied") + + for _, r := range appliedResources { + pp, found, err := unstructured.NestedString(r.Object, "data", "defaultImagePullPolicy") + assert.NoError(t, err) + if found { + t.Logf("%s/%s: %s", r.GetKind(), r.GetName(), pp) + } + } + } + + expectCached := func(t *testing.T) { + t.Helper() + assert.NoError(t, underTest.Reconcile(context.TODO(), cluster)) + } + + expectApplyButFail := func(t *testing.T) { + t.Helper() + applied := mockApplier.expectApply(t, assert.AnError) + assert.ErrorIs(t, underTest.Reconcile(context.TODO(), cluster), assert.AnError) + assert.NotEmpty(t, applied(), "Expected some resources to be applied") + } + + // Set some value that affects worker configs. + cluster.Spec.WorkerProfiles = v1beta1.WorkerProfiles{{Name: "foo", Config: json.RawMessage(`{"nodeLeaseDurationSeconds": 1}`)}} + t.Run("first_time_apply", expectApply) + t.Run("second_time_cached", expectCached) + + // Change that value, so that configs need to be reapplied. + cluster.Spec.WorkerProfiles = v1beta1.WorkerProfiles{{Name: "foo", Config: json.RawMessage(`{"nodeLeaseDurationSeconds": 2}`)}} + t.Run("third_time_apply_fails", expectApplyButFail) + + // After an error, expect a reapplication in any case. + t.Run("fourth_time_apply", expectApply) + + // Even if the last successfully applied config is restored, expect it to be applied after a failure. + cluster.Spec.WorkerProfiles = v1beta1.WorkerProfiles{{Name: "foo", Config: json.RawMessage(`{"nodeLeaseDurationSeconds": 1}`)}} + t.Run("fifth_time_apply", expectApply) + t.Run("sixth_time_cached", expectCached) +} + +func TestReconciler_runReconcileLoop(t *testing.T) { + underTest := Reconciler{ + log: newTestLogger(t), + leaderElector: &leaderelector.Dummy{Leader: true}, + } + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + + // Prepare update channel for two updates. + updates, firstDone, secondDone := make(chan updateFunc, 2), make(chan error, 1), make(chan error, 1) + + // Put in the first update. + updates <- func(s *snapshot) chan<- error { return firstDone } + + // Put in the second update that'll cancel the context. + updates <- func(s *snapshot) chan<- error { cancel(); return secondDone } + + underTest.runReconcileLoop(ctx, updates, nil) + + switch ctx.Err() { + case context.Canceled: + break // this is the good case + case context.DeadlineExceeded: + assert.Fail(t, "Test timed out") + case nil: + assert.Fail(t, "Loop exited even if the context isn't done") + default: + assert.Fail(t, "Unexpected context error", ctx.Err()) + } + + if assert.Len(t, firstDone, 1, "First done channel should contain exactly one element") { + err, ok := <-firstDone + assert.True(t, ok) + assert.NoError(t, err, "Error sent to first done channel when none was expected") + } + + select { + case _, ok := <-firstDone: + assert.False(t, ok, "More than one element sent to first done channel") + default: + assert.Fail(t, "First done channel is still open") + } + + if assert.Len(t, secondDone, 1, "Second done channel should contain exactly one element") { + err, ok := <-secondDone + assert.True(t, ok) + if assert.ErrorIs(t, err, errStoppedConcurrently, "Second done channel didn't indicate concurrent stopping") { + assert.Equal(t, "stopped concurrently while processing reconciliation", err.Error()) + } + } +} + +func TestReconciler_LeaderElection(t *testing.T) { + var le mockLeaderElector + cluster := v1beta1.DefaultClusterConfig(nil) + underTest, err := NewReconciler( + constant.GetConfig(t.TempDir()), + &v1beta1.ClusterSpec{ + API: &v1beta1.APISpec{}, + Network: &v1beta1.Network{ + ClusterDomain: "test.local", + ServiceCIDR: "99.99.99.0/24", + }, + }, + testutil.NewFakeClientFactory(), + &le, + ) + require.NoError(t, err) + underTest.log = newTestLogger(t) + + require.NoError(t, underTest.Init(context.TODO())) + + mockApplier := installMockApplier(t, underTest) + + require.NoError(t, underTest.Start(context.TODO())) + t.Cleanup(func() { + assert.NoError(t, underTest.Stop()) + }) + + // Nothing should be applied here, since the leader lease is not acquired. + assert.NoError(t, underTest.Reconcile(context.TODO(), cluster)) + + // Activate the leader lease and expect the resources to be applied. + applied := mockApplier.expectApply(t, nil) + le.activate() + assert.NotEmpty(t, applied(), "Expected some resources to be applied") + + // Deactivate the lease in order to reactivate it a second time. + le.deactivate() + + // Reactivate the lease and expect another reconciliation, even if the config didn't change. + applied = mockApplier.expectApply(t, nil) + le.activate() + assert.NotEmpty(t, applied(), "Expected some resources to be applied") +} + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.TODO()) + timeout := time.AfterFunc(10*time.Second, func() { + assert.Fail(t, "Test context timed out after 10 seconds") + cancel() + }) + t.Cleanup(func() { + timeout.Stop() + cancel() + }) + return ctx +} + +func newTestLogger(t *testing.T) logrus.FieldLogger { + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + return log.WithField("test", t.Name()) +} + +func requireKubelet(t *testing.T, resources []*unstructured.Unstructured, name string) string { + configMap := findResource(t, "No ConfigMap found with name "+name, + resources, func(resource *unstructured.Unstructured) bool { + return resource.GetKind() == "ConfigMap" && resource.GetName() == name + }, + ) + kubeletConfigYAML, ok, err := unstructured.NestedString(configMap.Object, "data", "kubeletConfiguration") + require.NoError(t, err) + require.True(t, ok, "No data.kubeletConfiguration field") + kubeletConfigJSON, err := yaml.YAMLToJSONStrict([]byte(kubeletConfigYAML)) + require.NoError(t, err) + return string(kubeletConfigJSON) +} + +func findResource(t *testing.T, failureMessage string, resources resources, probe func(*unstructured.Unstructured) bool) *unstructured.Unstructured { + for _, resource := range resources { + if probe(resource) { + return resource + } + } + require.Fail(t, failureMessage) + return nil +} + +func makeKubeletConfig(t *testing.T, mods ...func(*kubeletConfig)) string { + kubeletConfig := kubeletConfig{ + TypeMeta: metav1.TypeMeta{ + APIVersion: kubeletv1beta1.SchemeGroupVersion.String(), + Kind: "KubeletConfiguration", + }, + ClusterDNS: []string{"99.99.99.10"}, + ClusterDomain: "test.local", + EventRecordQPS: pointer.Int32(0), + FailSwapOn: pointer.Bool(false), + RotateCertificates: true, + ServerTLSBootstrap: true, + TLSCipherSuites: []string{ + "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", + "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", + "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", + "TLS_RSA_WITH_AES_128_GCM_SHA256", + "TLS_RSA_WITH_AES_256_GCM_SHA384", + }, + } + + for _, mod := range mods { + mod(&kubeletConfig) + } + + json, err := json.Marshal(&kubeletConfig) + require.NoError(t, err) + return string(json) +} + +type mockApplier struct { + ptr atomic.Pointer[[]mockApplierCall] +} + +type mockApplierCall = func(resources) error + +func (m *mockApplier) expectApply(t *testing.T, retval error) func() resources { + ch := make(chan resources, 1) + + recv := func() resources { + select { + case lastCalled, ok := <-ch: + if !ok { + require.Fail(t, "Channel closed unexpectedly") + } + return lastCalled + + case <-time.After(10 * time.Second): + require.Fail(t, "Timed out while waiting for call to apply()") + return nil // function diverges above + } + } + + send := func(r resources) error { + defer close(ch) + ch <- r + return retval + } + + for { + calls := m.ptr.Load() + len := len(*calls) + newCalls := make([]mockApplierCall, len+1) + copy(newCalls, *calls) + newCalls[len] = send + if m.ptr.CompareAndSwap(calls, &newCalls) { + break + } + } + + return recv +} + +func installMockApplier(t *testing.T, underTest *Reconciler) *mockApplier { + t.Helper() + mockApplier := mockApplier{} + mockApplier.ptr.Store(new([]mockApplierCall)) + + underTest.mu.Lock() + defer underTest.mu.Unlock() + + require.Equal(t, reconcilerInitialized, underTest.state, "unexpected state") + require.NotNil(t, underTest.apply) + t.Cleanup(func() { + for _, call := range *mockApplier.ptr.Swap(nil) { + assert.NoError(t, call(nil)) + } + }) + + underTest.apply = func(ctx context.Context, r resources) error { + if r == nil { + panic("cannot call apply() with nil resources") + } + + for { + expected := mockApplier.ptr.Load() + if len(*expected) < 1 { + panic("unexpected call to apply") + } + + newExpected := (*expected)[1:] + if mockApplier.ptr.CompareAndSwap(expected, &newExpected) { + return (*expected)[0](r) + } + } + } + + return &mockApplier +} + +type mockLeaderElector struct { + mu sync.Mutex + leader bool + acquired []func() +} + +func (e *mockLeaderElector) activate() { + e.mu.Lock() + defer e.mu.Unlock() + if !e.leader { + e.leader = true + for _, fn := range e.acquired { + fn() + } + } +} + +func (e *mockLeaderElector) deactivate() { + e.mu.Lock() + defer e.mu.Unlock() + e.leader = false +} + +func (e *mockLeaderElector) IsLeader() bool { + e.mu.Lock() + defer e.mu.Unlock() + return e.leader +} + +func (e *mockLeaderElector) AddAcquiredLeaseCallback(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + e.acquired = append(e.acquired, fn) + if e.leader { + fn() + } +} + +func (e *mockLeaderElector) AddLostLeaseCallback(func()) { + panic("not expected to be called in tests") +} diff --git a/pkg/component/controller/workerconfig/snapshot.go b/pkg/component/controller/workerconfig/snapshot.go new file mode 100644 index 000000000000..308596a8c60a --- /dev/null +++ b/pkg/component/controller/workerconfig/snapshot.go @@ -0,0 +1,72 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workerconfig + +import ( + "github.com/k0sproject/k0s/pkg/apis/k0s.k0sproject.io/v1beta1" +) + +// snapshot holds a snapshot of the parts that influence worker configurations. +type snapshot struct { + // The snapshot of the cluster configuration. + *configSnapshot + + // A simple counter that can be incremented every time a reconciliation + // shall be enforced, even if the rest of the snapshot still matches the + // last reconciled state. + serial uint +} + +// configSnapshot holds a snapshot of the parts of the cluster config spec that +// influence worker configurations. +type configSnapshot struct { + profiles v1beta1.WorkerProfiles +} + +func (s *snapshot) DeepCopy() *snapshot { + if s == nil { + return nil + } + out := new(snapshot) + s.DeepCopyInto(out) + return out +} + +func (s *snapshot) DeepCopyInto(out *snapshot) { + *out = *s + out.configSnapshot = s.configSnapshot.DeepCopy() +} + +func (s *configSnapshot) DeepCopy() *configSnapshot { + if s == nil { + return nil + } + out := new(configSnapshot) + s.DeepCopyInto(out) + return out +} + +func (s *configSnapshot) DeepCopyInto(out *configSnapshot) { + *out = *s + out.profiles = s.profiles.DeepCopy() +} + +func takeConfigSnapshot(spec *v1beta1.ClusterSpec) configSnapshot { + return configSnapshot{ + spec.WorkerProfiles.DeepCopy(), + } +} diff --git a/pkg/component/worker/config/config.go b/pkg/component/worker/config/config.go new file mode 100644 index 000000000000..0427574763ea --- /dev/null +++ b/pkg/component/worker/config/config.go @@ -0,0 +1,101 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "encoding/json" + "fmt" + "reflect" + + kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" + + "go.uber.org/multierr" + "sigs.k8s.io/yaml" +) + +type Profile struct { + KubeletConfiguration kubeletv1beta1.KubeletConfiguration +} + +func (p *Profile) DeepCopy() *Profile { + if p == nil { + return nil + } + out := new(Profile) + p.DeepCopyInto(out) + return out +} + +func (p *Profile) DeepCopyInto(out *Profile) { + *out = *p + p.KubeletConfiguration.DeepCopyInto(&out.KubeletConfiguration) +} + +func FromConfigMapData(data map[string]string) (*Profile, error) { + var config Profile + var errs error + forEachConfigMapEntry(&config, func(fieldName string, ptr any) { + data, ok := data[fieldName] + if ok { + if err := yaml.Unmarshal([]byte(data), ptr); err != nil { + errs = multierr.Append(errs, fmt.Errorf("%s: %w", fieldName, err)) + } + } + }) + + if errs != nil { + return nil, errs + } + + return &config, nil +} + +func ToConfigMapData(profile *Profile) (map[string]string, error) { + data := make(map[string]string) + + if profile == nil { + return data, nil + } + + var errs error + forEachConfigMapEntry(profile, func(fieldName string, ptr any) { + if reflect.ValueOf(ptr).Elem().IsZero() { + return + } + bytes, err := json.Marshal(ptr) + if err != nil { + errs = multierr.Append(errs, fmt.Errorf("%s: %w", fieldName, err)) + return + } + + data[fieldName] = string(bytes) + }) + + if errs != nil { + return nil, errs + } + + return data, nil +} + +func forEachConfigMapEntry(profile *Profile, f func(fieldName string, ptr any)) { + for fieldName, ptr := range map[string]any{ + "kubeletConfiguration": &profile.KubeletConfiguration, + } { + f(fieldName, ptr) + } +} diff --git a/pkg/component/worker/config/config_test.go b/pkg/component/worker/config/config_test.go new file mode 100644 index 000000000000..df27375c521c --- /dev/null +++ b/pkg/component/worker/config/config_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/resource" + logsv1 "k8s.io/component-base/logs/api/v1" + kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestToConfigMapData(t *testing.T) { + for _, test := range append(roundtripTests, + roundtripTest{"nil_produces_zero", nil, map[string]string{}}, + ) { + t.Run(test.name, func(t *testing.T) { + data, err := ToConfigMapData(test.profile) + require.NoError(t, err) + assert.Equal(t, test.data, data) + }) + } +} + +func TestFromConfigMapData(t *testing.T) { + for _, test := range append(roundtripTests, + roundtripTest{"nil_produces_zero", &Profile{}, nil}, + ) { + t.Run(test.name, func(t *testing.T) { + config, err := FromConfigMapData(test.data) + require.NoError(t, err) + require.NotNil(t, config) + assert.Equal(t, test.profile, config) + }) + } + + t.Run("collects_errors", func(t *testing.T) { + data := map[string]string{ + "kubeletConfiguration": "1", + } + + config, err := FromConfigMapData(data) + assert.ErrorContains(t, err, "json: cannot unmarshal number into Go value of type") + assert.Nil(t, config) + }) +} + +type roundtripTest struct { + name string + profile *Profile + data map[string]string +} + +var roundtripTests = []roundtripTest{ + {"empty", &Profile{}, map[string]string{}}, + { + "kubelet", + &Profile{ + KubeletConfiguration: kubeletv1beta1.KubeletConfiguration{ + Logging: logsv1.LoggingConfiguration{ + Options: logsv1.FormatOptions{ + JSON: logsv1.JSONOptions{ + InfoBufferSize: resource.QuantityValue{ + // This will be set as default by the unmarshaler. + Quantity: resource.MustParse("0"), + }, + }, + }, + }, + }, + }, + map[string]string{ + "kubeletConfiguration": `{"syncFrequency":"0s","fileCheckFrequency":"0s","httpCheckFrequency":"0s","authentication":{"x509":{},"webhook":{"cacheTTL":"0s"},"anonymous":{}},"authorization":{"webhook":{"cacheAuthorizedTTL":"0s","cacheUnauthorizedTTL":"0s"}},"streamingConnectionIdleTimeout":"0s","nodeStatusUpdateFrequency":"0s","nodeStatusReportFrequency":"0s","imageMinimumGCAge":"0s","volumeStatsAggPeriod":"0s","cpuManagerReconcilePeriod":"0s","runtimeRequestTimeout":"0s","evictionPressureTransitionPeriod":"0s","memorySwap":{},"logging":{"flushFrequency":0,"verbosity":0,"options":{"json":{"infoBufferSize":"0"}}},"shutdownGracePeriod":"0s","shutdownGracePeriodCriticalPods":"0s"}`, + }, + }, +} diff --git a/pkg/component/worker/config/loader.go b/pkg/component/worker/config/loader.go new file mode 100644 index 000000000000..eed9abcfb9b0 --- /dev/null +++ b/pkg/component/worker/config/loader.go @@ -0,0 +1,86 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + "fmt" + "time" + + "github.com/k0sproject/k0s/pkg/constant" + kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "github.com/avast/retry-go" + "github.com/sirupsen/logrus" +) + +// LoadProfile loads the worker profile with the given profile name from +// Kubernetes. +func LoadProfile(ctx context.Context, kubeconfig clientcmd.KubeconfigGetter, profileName string) (*Profile, error) { + clientConfig, err := kubeutil.ClientConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client config: %w", err) + } + + clientFactory := func() (kubernetes.Interface, error) { + return kubernetes.NewForConfig(clientConfig) + } + + return loadProfile(ctx, logrus.StandardLogger(), clientFactory, profileName) +} + +func loadProfile(ctx context.Context, log logrus.FieldLogger, clientFactory func() (kubernetes.Interface, error), profileName string) (*Profile, error) { + configMapName := fmt.Sprintf("%s-%s-%s", constant.WorkerConfigComponentName, profileName, constant.KubernetesMajorMinorVersion) + + var configMap *corev1.ConfigMap + if err := retry.Do( + func() error { + client, err := clientFactory() + if err != nil { + return err + } + + configMap, err = client.CoreV1().ConfigMaps("kube-system").Get(ctx, configMapName, metav1.GetOptions{}) + return err + }, + retry.Context(ctx), + retry.LastErrorOnly(true), + retry.Delay(500*time.Millisecond), + retry.OnRetry(func(attempt uint, err error) { + log.WithError(err).Debugf("Failed to load configuration for worker profile in attempt #%d, retrying after backoff", attempt+1) + }), + ); err != nil { + if apierrors.IsUnauthorized(err) { + err = fmt.Errorf("the k0s worker node credentials are invalid, the node needs to be rejoined into the cluster with a fresh bootstrap token: %w", err) + } + + return nil, err + } + + profile, err := FromConfigMapData(configMap.Data) + if err != nil { + return nil, err + } + + return profile, nil +} diff --git a/pkg/component/worker/config/loader_test.go b/pkg/component/worker/config/loader_test.go new file mode 100644 index 000000000000..3583983358a8 --- /dev/null +++ b/pkg/component/worker/config/loader_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/k0sproject/k0s/pkg/constant" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLoadProfile(t *testing.T) { + workerConfigMap := corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("worker-config-fake-%s", constant.KubernetesMajorMinorVersion), + Namespace: "kube-system", + }, + Data: map[string]string{"kubeletConfiguration": `{"kind":"foo"}`}, + } + + clientFactory := func() (kubernetes.Interface, error) { + return fake.NewSimpleClientset(&workerConfigMap), nil + } + + workerProfile, err := func() (*Profile, error) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + timer := time.AfterFunc(10*time.Second, func() { + assert.Fail(t, "Call to Loader.Load() took too long, check the logs for details") + cancel() + }) + defer timer.Stop() + + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + workerConfig, err := loadProfile( + ctx, + log.WithField("test", t.Name()), + clientFactory, + "fake", + ) + + if t.Failed() { + t.FailNow() + } + return workerConfig, err + }() + + require.NoError(t, err) + if assert.NotNil(t, workerProfile) { + expected := kubeletv1beta1.KubeletConfiguration{ + TypeMeta: metav1.TypeMeta{Kind: "foo"}, + } + assert.Equal(t, &expected, &workerProfile.KubeletConfiguration) + } +} diff --git a/pkg/component/worker/kubelet.go b/pkg/component/worker/kubelet.go index e5d9038125c7..bb03b716ebe2 100644 --- a/pkg/component/worker/kubelet.go +++ b/pkg/component/worker/kubelet.go @@ -26,16 +26,6 @@ import ( "path/filepath" "runtime" "strings" - "time" - - "github.com/avast/retry-go" - "github.com/docker/libnetwork/resolvconf" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/validation" - kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" - "k8s.io/utils/pointer" - "sigs.k8s.io/yaml" "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/internal/pkg/file" @@ -46,6 +36,15 @@ import ( "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/constant" "github.com/k0sproject/k0s/pkg/supervisor" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/validation" + kubeletv1beta1 "k8s.io/kubelet/config/v1beta1" + "k8s.io/utils/pointer" + + "github.com/docker/libnetwork/resolvconf" + "github.com/sirupsen/logrus" + "sigs.k8s.io/yaml" ) // Kubelet is the component implementation to manage kubelet @@ -53,9 +52,9 @@ type Kubelet struct { CRISocket string EnableCloudProvider bool K0sVars constant.CfgVars - KubeletConfigClient *KubeletConfigClient + Kubeconfig string + Configuration kubeletv1beta1.KubeletConfiguration LogLevel string - Profile string dataDir string supervisor supervisor.Supervisor ClusterDNS string @@ -152,7 +151,7 @@ func (k *Kubelet) Start(ctx context.Context) error { args := stringmap.StringMap{ "--root-dir": k.dataDir, "--config": kubeletConfigPath, - "--kubeconfig": k.K0sVars.KubeletAuthConfigPath, + "--kubeconfig": k.Kubeconfig, "--v": k.LogLevel, "--runtime-cgroups": "/system.slice/containerd.service", "--cert-dir": filepath.Join(k.dataDir, "pki"), @@ -202,7 +201,7 @@ func (k *Kubelet) Start(ctx context.Context) error { args["--cloud-provider"] = "external" } - // Handle the extra args as last so they can be used to overrride some k0s "hardcodings" + // Handle the extra args as last so they can be used to override some k0s "hardcodings" if k.ExtraArgs != "" { extras := flags.Split(k.ExtraArgs) args.Merge(extras) @@ -217,30 +216,15 @@ func (k *Kubelet) Start(ctx context.Context) error { Args: args.ToArgs(), } - err := retry.Do(func() error { - kubeletconfig, err := k.KubeletConfigClient.Get(ctx, k.Profile) - if err != nil { - logrus.Warnf("failed to get initial kubelet config with join token: %s", err.Error()) - return err - } - kubeletconfig, err = k.prepareLocalKubeletConfig(kubeletconfig, kubeletConfigData) - if err != nil { - logrus.Warnf("failed to prepare local kubelet config: %s", err.Error()) - return err - } - err = file.WriteContentAtomically(kubeletConfigPath, []byte(kubeletconfig), 0644) - if err != nil { - return fmt.Errorf("failed to write kubelet config: %w", err) - } - - return nil - }, - retry.Context(ctx), - retry.Delay(time.Millisecond*500), - retry.DelayType(retry.BackOffDelay)) + kubeletconfig, err := k.prepareLocalKubeletConfig(kubeletConfigData) if err != nil { + logrus.Warnf("failed to prepare local kubelet config: %s", err.Error()) return err } + err = file.WriteContentAtomically(kubeletConfigPath, []byte(kubeletconfig), 0644) + if err != nil { + return fmt.Errorf("failed to write kubelet config: %w", err) + } return k.supervisor.Supervise() } @@ -250,19 +234,14 @@ func (k *Kubelet) Stop() error { return k.supervisor.Stop() } -func (k *Kubelet) prepareLocalKubeletConfig(kubeletconfig string, kubeletConfigData kubeletConfig) (string, error) { - var kubeletConfiguration kubeletv1beta1.KubeletConfiguration - err := yaml.Unmarshal([]byte(kubeletconfig), &kubeletConfiguration) - if err != nil { - return "", fmt.Errorf("can't unmarshal kubelet config: %v", err) - } - - kubeletConfiguration.Authentication.X509.ClientCAFile = kubeletConfigData.ClientCAFile // filepath.Join(k.K0sVars.CertRootDir, "ca.crt") - kubeletConfiguration.VolumePluginDir = kubeletConfigData.VolumePluginDir // k.K0sVars.KubeletVolumePluginDir - kubeletConfiguration.KubeReservedCgroup = kubeletConfigData.KubeReservedCgroup - kubeletConfiguration.KubeletCgroups = kubeletConfigData.KubeletCgroups - kubeletConfiguration.ResolverConfig = pointer.String(kubeletConfigData.ResolvConf) - kubeletConfiguration.CgroupsPerQOS = pointer.Bool(kubeletConfigData.CgroupsPerQOS) +func (k *Kubelet) prepareLocalKubeletConfig(kubeletConfigData kubeletConfig) (string, error) { + preparedConfig := k.Configuration.DeepCopy() + preparedConfig.Authentication.X509.ClientCAFile = kubeletConfigData.ClientCAFile // filepath.Join(k.K0sVars.CertRootDir, "ca.crt") + preparedConfig.VolumePluginDir = kubeletConfigData.VolumePluginDir // k.K0sVars.KubeletVolumePluginDir + preparedConfig.KubeReservedCgroup = kubeletConfigData.KubeReservedCgroup + preparedConfig.KubeletCgroups = kubeletConfigData.KubeletCgroups + preparedConfig.ResolverConfig = pointer.String(kubeletConfigData.ResolvConf) + preparedConfig.CgroupsPerQOS = pointer.Bool(kubeletConfigData.CgroupsPerQOS) if len(k.Taints) > 0 { var taints []corev1.Taint @@ -273,14 +252,14 @@ func (k *Kubelet) prepareLocalKubeletConfig(kubeletconfig string, kubeletConfigD } taints = append(taints, parsedTaint) } - kubeletConfiguration.RegisterWithTaints = taints + preparedConfig.RegisterWithTaints = taints } - localKubeletConfig, err := yaml.Marshal(kubeletConfiguration) + preparedConfigBytes, err := yaml.Marshal(preparedConfig) if err != nil { return "", fmt.Errorf("can't marshal kubelet config: %v", err) } - return string(localKubeletConfig), nil + return string(preparedConfigBytes), nil } const awsMetaInformationURI = "http://169.254.169.254/latest/meta-data/local-hostname" diff --git a/pkg/component/worker/kubeletconfigclient.go b/pkg/component/worker/kubeletconfigclient.go deleted file mode 100644 index c3bd6213314f..000000000000 --- a/pkg/component/worker/kubeletconfigclient.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2022 k0s authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package worker - -import ( - "context" - "fmt" - - "github.com/k0sproject/k0s/pkg/constant" - k8sutil "github.com/k0sproject/k0s/pkg/kubernetes" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" -) - -// KubeletConfigClient is the client used to fetch kubelet config from a common config map -type KubeletConfigClient struct { - kubeClient kubernetes.Interface -} - -// NewKubeletConfigClient creates new KubeletConfigClient using the specified kubeconfig -func NewKubeletConfigClient(kubeconfigPath string) (*KubeletConfigClient, error) { - kubeClient, err := k8sutil.NewClient(kubeconfigPath) - if err != nil { - return nil, err - } - - return &KubeletConfigClient{ - kubeClient: kubeClient, - }, nil -} - -// Get reads the config from kube api -func (k *KubeletConfigClient) Get(ctx context.Context, profile string) (string, error) { - cmName := fmt.Sprintf("kubelet-config-%s-%s", profile, constant.KubernetesMajorMinorVersion) - cm, err := k.kubeClient.CoreV1().ConfigMaps("kube-system").Get(ctx, cmName, v1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("failed to get kubelet config from API: %w", err) - } - config := cm.Data["kubelet"] - if config == "" { - return "", fmt.Errorf("no config found with key 'kubelet' in %s", cmName) - } - return config, nil -} diff --git a/pkg/component/worker/utils.go b/pkg/component/worker/utils.go index 69185d8b1dc2..f925c476796b 100644 --- a/pkg/component/worker/utils.go +++ b/pkg/component/worker/utils.go @@ -151,15 +151,6 @@ func BootstrapKubeletKubeconfig(ctx context.Context, k0sVars constant.CfgVars, w return nil } -func LoadKubeletConfigClient(k0svars constant.CfgVars) (*KubeletConfigClient, error) { - var kubeletConfigClient *KubeletConfigClient - kubeletConfigClient, err := NewKubeletConfigClient(k0svars.KubeletAuthConfigPath) - if err != nil { - return nil, fmt.Errorf("failed to create kubelet config client: %w", err) - } - return kubeletConfigClient, nil -} - func writeKubeletBootstrapKubeconfig(kubeconfig []byte) (string, error) { dir := os.Getenv("XDG_RUNTIME_DIR") if dir == "" && runtime.GOOS != "windows" { diff --git a/pkg/config/cli.go b/pkg/config/cli.go index a0b4f4009961..d24d40fd959c 100644 --- a/pkg/config/cli.go +++ b/pkg/config/cli.go @@ -103,6 +103,13 @@ func (o *ControllerOptions) Normalize() error { // Normalize component names var disabledComponents []string for _, disabledComponent := range o.DisableComponents { + if disabledComponent == constant.KubeletConfigComponentName { + logrus.Warnf("Usage of deprecated component name %q, please switch to %q", + constant.KubeletConfigComponentName, constant.WorkerConfigComponentName, + ) + disabledComponent = constant.WorkerConfigComponentName + } + if !slices.Contains(availableComponents, disabledComponent) { return fmt.Errorf("unknown component %s", disabledComponent) } @@ -190,11 +197,11 @@ var availableComponents = []string{ constant.KubeControllerManagerComponentName, constant.KubeProxyComponentName, constant.KubeSchedulerComponentName, - constant.KubeletConfigComponentName, constant.MetricsServerComponentName, constant.NetworkProviderComponentName, constant.NodeRoleComponentName, constant.SystemRbacComponentName, + constant.WorkerConfigComponentName, } func GetControllerFlags() *pflag.FlagSet { diff --git a/pkg/config/cli_test.go b/pkg/config/cli_test.go index 4d9e2c4eba3a..b4e724404f54 100644 --- a/pkg/config/cli_test.go +++ b/pkg/config/cli_test.go @@ -44,14 +44,30 @@ func TestControllerOptions_Normalize(t *testing.T) { assert.ErrorContains(t, err, "unknown component i-dont-exist") }) - t.Run("removesDuplicateComponents", func(t *testing.T) { - disabled := []string{"helm", "kube-proxy", "coredns", "kube-proxy", "autopilot"} - expected := []string{"helm", "kube-proxy", "coredns", "autopilot"} - - underTest := ControllerOptions{DisableComponents: disabled} + for _, test := range []struct { + name string + disabled, expected []string + }{ + { + "removesDuplicateComponents", + []string{"helm", "kube-proxy", "coredns", "kube-proxy", "autopilot"}, + []string{"helm", "kube-proxy", "coredns", "autopilot"}, + }, + { + "replacesDeprecation", + []string{"helm", "kubelet-config", "coredns", "kubelet-config", "autopilot"}, + []string{"helm", "worker-config", "coredns", "autopilot"}, + }, + { + "replacesDeprecationAvoidingDuplicates", + []string{"helm", "kubelet-config", "coredns", "kubelet-config", "worker-config", "autopilot"}, + []string{"helm", "worker-config", "coredns", "autopilot"}, + }, + } { + underTest := ControllerOptions{DisableComponents: test.disabled} err := underTest.Normalize() require.NoError(t, err) - assert.Equal(t, expected, underTest.DisableComponents) - }) + assert.Equal(t, test.expected, underTest.DisableComponents) + } } diff --git a/pkg/config/file_config.go b/pkg/config/file_config.go index 830302534e01..ff582c3fcf44 100644 --- a/pkg/config/file_config.go +++ b/pkg/config/file_config.go @@ -52,7 +52,13 @@ func (rules *ClientConfigLoadingRules) readRuntimeConfig() (clusterConfig *v1bet if rules.RuntimeConfigPath == "" { rules.RuntimeConfigPath = runtimeConfigPathDefault } - return rules.ParseRuntimeConfig() + + config, err := rules.ParseRuntimeConfig() + if err != nil { + return nil, fmt.Errorf("failed to parse config from %q: %w", rules.RuntimeConfigPath, err) + } + + return config, err } // generic function that reads a config file, and returns a ClusterConfig object diff --git a/pkg/constant/constant_shared.go b/pkg/constant/constant_shared.go index a5fe513d5f8c..43001684969f 100644 --- a/pkg/constant/constant_shared.go +++ b/pkg/constant/constant_shared.go @@ -106,7 +106,8 @@ const ( KubeControllerManagerComponentName = "kube-controller-manager" KubeProxyComponentName = "kube-proxy" KubeSchedulerComponentName = "kube-scheduler" - KubeletConfigComponentName = "kubelet-config" + KubeletConfigComponentName = "kubelet-config" // Deprecated: replaced by worker-config + WorkerConfigComponentName = "worker-config" MetricsServerComponentName = "metrics-server" NetworkProviderComponentName = "network-provider" SystemRbacComponentName = "system-rbac" diff --git a/pkg/kubernetes/client.go b/pkg/kubernetes/client.go index 15d595566931..74615cf8167c 100644 --- a/pkg/kubernetes/client.go +++ b/pkg/kubernetes/client.go @@ -183,19 +183,35 @@ func (c *ClientFactory) GetRESTConfig() *rest.Config { return c.restConfig } -// NewClient creates new k8s client based of the given kubeconfig -// This should be only used in cases where the client is "short-running" and shouldn't/cannot use the common "cached" one. -func NewClient(kubeconfig string) (kubernetes.Interface, error) { - // use the current context in kubeconfig - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) +// KubeconfigFromFile returns a [clientcmd.KubeconfigGetter] that tries to load +// a kubeconfig from the given path. +func KubeconfigFromFile(path string) clientcmd.KubeconfigGetter { + return (&clientcmd.ClientConfigLoadingRules{ExplicitPath: path}).Load +} + +// NewClientFromFile creates a new Kubernetes client based of the given +// kubeconfig file. +func NewClientFromFile(kubeconfig string) (kubernetes.Interface, error) { + return NewClient(KubeconfigFromFile(kubeconfig)) +} + +func ClientConfig(getter clientcmd.KubeconfigGetter) (*rest.Config, error) { + kubeconfig, err := getter() if err != nil { - return nil, fmt.Errorf("failed to load kubeconfig: %w", err) + return nil, err } - clientset, err := kubernetes.NewForConfig(config) + return clientcmd.NewNonInteractiveClientConfig(*kubeconfig, "", nil, nil).ClientConfig() +} + +// NewClient creates new k8s client based of the given kubeconfig getter. This +// should be only used in cases where the client is "short-running" and +// shouldn't/cannot use the common "cached" one. +func NewClient(getter clientcmd.KubeconfigGetter) (kubernetes.Interface, error) { + config, err := ClientConfig(getter) if err != nil { - return nil, fmt.Errorf("failed to create k8s client: %w", err) + return nil, err } - return clientset, nil + return kubernetes.NewForConfig(config) } diff --git a/pkg/token/manager.go b/pkg/token/manager.go index 9c2e46fd2d37..0b69cc423183 100644 --- a/pkg/token/manager.go +++ b/pkg/token/manager.go @@ -44,7 +44,7 @@ func (t Token) ToArray() []string { // NewManager creates a new token manager using given kubeconfig func NewManager(kubeconfig string) (*Manager, error) { logrus.Debugf("loading kubeconfig from: %s", kubeconfig) - client, err := k8sutil.NewClient(kubeconfig) + client, err := k8sutil.NewClientFromFile(kubeconfig) if err != nil { return nil, err }