Skip to content

Commit

Permalink
CCM/CSI migration support for clusters with static worker nodes (#1544)
Browse files Browse the repository at this point in the history
* CCM/CSI migration support for clusters with static worker nodes

Signed-off-by: Marko Mudrinić <[email protected]>

* Address review comments

Signed-off-by: Marko Mudrinić <[email protected]>
  • Loading branch information
xmudrii authored Sep 28, 2021
1 parent 2e81602 commit 6130187
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 113 deletions.
8 changes: 8 additions & 0 deletions pkg/scripts/ccm_csi_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ var (
sudo kubeadm {{ .VERBOSE }} init phase kubelet-start \
--config={{ .WORK_DIR }}/cfg/master_{{ .NODE_ID }}.yaml
`)

ccmMigrationRestartKubelet = heredoc.Doc(`
sudo systemctl restart kubelet
`)
)

func CCMMigrationRegenerateControlPlaneManifests(workdir string, nodeID int, verboseFlag string) (string, error) {
Expand All @@ -48,3 +52,7 @@ func CCMMigrationUpdateKubeletConfig(workdir string, nodeID int, verboseFlag str
"VERBOSE": verboseFlag,
})
}

func CCMMigrationRestartKubelet() (string, error) {
return Render(ccmMigrationRestartKubelet, Data{})
}
138 changes: 117 additions & 21 deletions pkg/tasks/ccm_csi_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
provisionedByOpenStackCSICinder = "cinder.csi.openstack.org"
)

func validateExternalCloudProviderConfig(s *state.State) error {
func ccmMigrationValidateConfig(s *state.State) error {
if !s.Cluster.CloudProvider.External {
return errors.New(".cloudProvider.external must be enabled to start the migration")
}
Expand All @@ -59,14 +59,11 @@ func validateExternalCloudProviderConfig(s *state.State) error {
if s.Cluster.CloudProvider.Vsphere != nil && s.Cluster.CloudProvider.CSIConfig == "" {
return errors.New("the ccm/csi migration for vsphere requires providing csi configuration using .cloudProvider.csiConfig field")
}
if len(s.Cluster.StaticWorkers.Hosts) > 0 {
return errors.New("the ccm/csi migration for cluster with static worker nodes is currently unsupported")
}

return nil
}

func readyToCompleteMigration(s *state.State) error {
func readyToCompleteCCMMigration(s *state.State) error {
if s.DynamicClient == nil {
return errors.New("clientset not initialized")
}
Expand All @@ -92,11 +89,11 @@ func readyToCompleteMigration(s *state.State) error {
return nil
}

func regenerateControlPlaneManifests(s *state.State) error {
return s.RunTaskOnControlPlane(regenerateControlPlaneManifestsInternal, state.RunSequentially)
func ccmMigrationRegenerateControlPlaneManifests(s *state.State) error {
return s.RunTaskOnControlPlane(ccmMigrationRegenerateControlPlaneManifestsInternal, state.RunSequentially)
}

func regenerateControlPlaneManifestsInternal(s *state.State, node *kubeoneapi.HostConfig, conn ssh.Connection) error {
func ccmMigrationRegenerateControlPlaneManifestsInternal(s *state.State, node *kubeoneapi.HostConfig, conn ssh.Connection) error {
logger := s.Logger.WithField("node", node.PublicAddress)
logger.Info("Regenerating Kubernetes API server and kube-controller-manager manifests...")

Expand Down Expand Up @@ -134,11 +131,11 @@ func regenerateControlPlaneManifestsInternal(s *state.State, node *kubeoneapi.Ho
return nil
}

func updateKubeletConfig(s *state.State) error {
return s.RunTaskOnControlPlane(updateKubeletConfigInternal, state.RunSequentially)
func ccmMigrationUpdateControlPlaneKubeletConfig(s *state.State) error {
return s.RunTaskOnControlPlane(ccmMigrationUpdateControlPlaneKubeletConfigInternal, state.RunSequentially)
}

func updateKubeletConfigInternal(s *state.State, node *kubeoneapi.HostConfig, conn ssh.Connection) error {
func ccmMigrationUpdateControlPlaneKubeletConfigInternal(s *state.State, node *kubeoneapi.HostConfig, conn ssh.Connection) error {
logger := s.Logger.WithField("node", node.PublicAddress)
logger.Info("Updating config and restarting Kubelet...")

Expand All @@ -154,6 +151,7 @@ func updateKubeletConfigInternal(s *state.State, node *kubeoneapi.HostConfig, co
return errors.Wrap(err, "failed to drain follower control plane node")
}

logger.Info("Updating Kubelet config...")
cmd, err := scripts.CCMMigrationUpdateKubeletConfig(s.WorkDir, node.ID, s.KubeadmVerboseFlag())
if err != nil {
return err
Expand All @@ -165,22 +163,65 @@ func updateKubeletConfigInternal(s *state.State, node *kubeoneapi.HostConfig, co

timeout := 2 * time.Minute
logger.Debugf("Waiting up to %s for Kubelet to become running...", timeout)
err = wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
kubeletStatus, sErr := systemdStatus(conn, "kubelet")
if sErr != nil {
return false, sErr
}
if err := waitForKubeletReady(conn, timeout); err != nil {
return errors.Wrapf(err, "kubelet failed to start for %s", timeout)
}

if kubeletStatus&state.SystemDStatusRunning != 0 && kubeletStatus&state.SystemDStatusRestarting == 0 {
return true, nil
}
logger.Infoln("Uncordoning node...")
if err := drainer.Cordon(s.Context, node.Hostname, false); err != nil {
return errors.Wrap(err, "failed to uncordon follower control plane node")
}

return false, nil
})
return nil
}

func ccmMigrationUpdateStaticWorkersKubeletConfig(s *state.State) error {
return s.RunTaskOnStaticWorkers(ccmMigrationUpdateStaticWorkersKubeletConfigInternal, state.RunSequentially)
}

func ccmMigrationUpdateStaticWorkersKubeletConfigInternal(s *state.State, node *kubeoneapi.HostConfig, conn ssh.Connection) error {
logger := s.Logger.WithField("node", node.PublicAddress)
logger.Info("Updating config and restarting Kubelet...")

drainer := nodeutils.NewDrainer(s.RESTConfig, logger)

logger.Infoln("Cordoning node...")
if err := drainer.Cordon(s.Context, node.Hostname, true); err != nil {
return errors.Wrap(err, "failed to cordon follower control plane node")
}

logger.Infoln("Draining node...")
if err := drainer.Drain(s.Context, node.Hostname); err != nil {
return errors.Wrap(err, "failed to drain follower control plane node")
}

// Update kubelet config and flags
logger.Info("Updating Kubelet config...")
if err := ccmMigrationUpdateKubeletConfigFile(s); err != nil {
return err
}
if err := ccmMigrationUpdateKubeletFlags(s); err != nil {
return err
}

// Restart Kubelet
logger.Info("Restarting Kubelet...")
script, err := scripts.CCMMigrationRestartKubelet()
if err != nil {
return err
}

_, _, err = s.Runner.RunRaw(script)
if err != nil {
return err
}

timeout := 2 * time.Minute
logger.Debugf("Waiting up to %s for Kubelet to become running...", timeout)
if err := waitForKubeletReady(conn, timeout); err != nil {
return errors.Wrapf(err, "kubelet failed to start for %s", timeout)
}

logger.Infoln("Uncordoning node...")
if err := drainer.Cordon(s.Context, node.Hostname, false); err != nil {
return errors.Wrap(err, "failed to uncordon follower control plane node")
Expand All @@ -189,6 +230,46 @@ func updateKubeletConfigInternal(s *state.State, node *kubeoneapi.HostConfig, co
return nil
}

func ccmMigrationUpdateKubeletConfigFile(s *state.State) error {
return updateRemoteFile(s, kubeletConfigFile, func(content []byte) ([]byte, error) {
// Unmarshal and update the config
kubeletConfig, err := unmarshalKubeletConfig(content)
if err != nil {
return nil, err
}

if kubeletConfig.FeatureGates == nil {
kubeletConfig.FeatureGates = map[string]bool{}
}
if s.ShouldEnableCSIMigration() {
featureGates, _, fgErr := s.Cluster.CSIMigrationFeatureGates(s.ShouldUnregisterInTreeCloudProvider())
if fgErr != nil {
return nil, fgErr
}
for k, v := range featureGates {
kubeletConfig.FeatureGates[k] = v
}
}

return marshalKubeletConfig(kubeletConfig)
})
}

func ccmMigrationUpdateKubeletFlags(s *state.State) error {
return updateRemoteFile(s, kubeadmEnvFlagsFile, func(content []byte) ([]byte, error) {
kubeletFlags, err := unmarshalKubeletFlags(content)
if err != nil {
return nil, err
}

kubeletFlags["--cloud-provider"] = "external"
delete(kubeletFlags, "--cloud-config")

buf := marshalKubeletFlags(kubeletFlags)
return buf, nil
})
}

func waitForStaticPodReady(s *state.State, timeout time.Duration, staticPodName, staticPodNamespace string) error {
if s.DynamicClient == nil {
return errors.New("clientset not initialized")
Expand Down Expand Up @@ -244,6 +325,21 @@ func waitForStaticPodReady(s *state.State, timeout time.Duration, staticPodName,
})
}

func waitForKubeletReady(conn ssh.Connection, timeout time.Duration) error {
return wait.PollImmediate(5*time.Second, timeout, func() (bool, error) {
kubeletStatus, sErr := systemdStatus(conn, "kubelet")
if sErr != nil {
return false, sErr
}

if kubeletStatus&state.SystemDStatusRunning != 0 && kubeletStatus&state.SystemDStatusRestarting == 0 {
return true, nil
}

return false, nil
})
}

func migrateOpenStackPVs(s *state.State) error {
if s.DynamicClient == nil {
return errors.New("dynamic client is not initialized")
Expand Down
129 changes: 129 additions & 0 deletions pkg/tasks/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2021 The KubeOne Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tasks

import (
"bytes"
"fmt"
"io"
"sort"
"strings"

"github.com/pkg/errors"

"k8c.io/kubeone/pkg/ssh/sshiofs"
"k8c.io/kubeone/pkg/state"

kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"sigs.k8s.io/yaml"
)

const (
kubeadmEnvFlagsFile = "/var/lib/kubelet/kubeadm-flags.env"
kubeletKubeadmArgsEnv = "KUBELET_KUBEADM_ARGS"
kubeletConfigFile = "/var/lib/kubelet/config.yaml"
)

func updateRemoteFile(s *state.State, filePath string, modifier func(content []byte) ([]byte, error)) error {
sshfs := s.Runner.NewFS()
f, err := sshfs.Open(filePath)
if err != nil {
return err
}
defer f.Close()

buf, err := io.ReadAll(f)
if err != nil {
return err
}

buf, err = modifier(buf)
if err != nil {
return err
}

fw, ok := f.(sshiofs.ExtendedFile)
if !ok {
return errors.New("file is not writable")
}

if err = fw.Truncate(0); err != nil {
return err
}

if _, err = fw.Seek(0, io.SeekStart); err != nil {
return err
}

if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil {
return err
}

return nil
}

func unmarshalKubeletFlags(buf []byte) (map[string]string, error) {
// throw away KUBELET_KUBEADM_ARGS=
s1 := strings.SplitN(string(buf), "=", 2)
if len(s1) != 2 {
return nil, errors.New("can't parse: wrong split length")
}

envValue := strings.Trim(s1[1], `"`)
flagsvalues := strings.Split(envValue, " ")
kubeletflagsMap := map[string]string{}

for _, flg := range flagsvalues {
fl := strings.SplitN(flg, "=", 2)
if len(fl) != 2 {
return nil, errors.New("wrong split length")
}
kubeletflagsMap[fl[0]] = fl[1]
}

return kubeletflagsMap, nil
}

func marshalKubeletFlags(kubeletflags map[string]string) []byte {
kvpairs := []string{}
for k, v := range kubeletflags {
kvpairs = append(kvpairs, fmt.Sprintf("%s=%s", k, v))
}

sort.Strings(kvpairs)

return []byte(fmt.Sprintf(`%s="%s"`, kubeletKubeadmArgsEnv, strings.Join(kvpairs, " ")))
}

func unmarshalKubeletConfig(configBytes []byte) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
var config kubeletconfigv1beta1.KubeletConfiguration
err := yaml.Unmarshal(configBytes, &config)
if err != nil {
return nil, err
}

return &config, nil
}

func marshalKubeletConfig(config *kubeletconfigv1beta1.KubeletConfiguration) ([]byte, error) {
encodedCfg, err := yaml.Marshal(config)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal kubelet config")
}

return encodedCfg, nil
}
9 changes: 9 additions & 0 deletions pkg/tasks/containerd_test.go → pkg/tasks/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ func Test_unmarshalKubeletFlags(t *testing.T) {
},
wantErr: false,
},
{
name: "key-values in a flag",
buf: []byte(`KUBELET_KUBEADM_ARGS="--key1=val1=test1,val2=test2 --key2=val2"`),
want: map[string]string{
"--key1": "val1=test1,val2=test2",
"--key2": "val2",
},
wantErr: false,
},
{
name: "error1",
buf: []byte{},
Expand Down
Loading

0 comments on commit 6130187

Please sign in to comment.