From ff78b0e64d88a360025e4f3b66359dce8c5bab1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mudrini=C4=87?= Date: Tue, 28 Sep 2021 14:38:06 +0200 Subject: [PATCH] Address review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marko Mudrinić --- pkg/tasks/ccm_csi_migration.go | 120 ++++-------------- pkg/tasks/common.go | 59 ++++++--- .../{containerd_test.go => common_test.go} | 9 ++ pkg/tasks/containerd.go | 47 ++----- 4 files changed, 92 insertions(+), 143 deletions(-) rename pkg/tasks/{containerd_test.go => common_test.go} (90%) diff --git a/pkg/tasks/ccm_csi_migration.go b/pkg/tasks/ccm_csi_migration.go index ecaf0d6d9..a7cdcb8a4 100644 --- a/pkg/tasks/ccm_csi_migration.go +++ b/pkg/tasks/ccm_csi_migration.go @@ -17,9 +17,7 @@ limitations under the License. package tasks import ( - "bytes" "fmt" - "io" "strconv" "time" @@ -29,7 +27,6 @@ import ( "k8c.io/kubeone/pkg/nodeutils" "k8c.io/kubeone/pkg/scripts" "k8c.io/kubeone/pkg/ssh" - "k8c.io/kubeone/pkg/ssh/sshiofs" "k8c.io/kubeone/pkg/state" "github.com/kubermatic/machine-controller/pkg/apis/cluster/common" @@ -234,104 +231,43 @@ func ccmMigrationUpdateStaticWorkersKubeletConfigInternal(s *state.State, node * } func ccmMigrationUpdateKubeletConfigFile(s *state.State) error { - // Grab the Kubelet configuration file from the node - sshfs := s.Runner.NewFS() - f, err := sshfs.Open(kubeletConfigFile) - if err != nil { - return err - } - defer f.Close() - - buf, err := io.ReadAll(f) - if err != nil { - return err - } - - // Unmarshal and update the config - kubeletConfig, err := unmarshalKubeletConfig(buf) - if err != nil { - return err - } + return updateRemoteFile(s, kubeletConfigFile, func(content []byte) ([]byte, error) { + // Unmarshal and update the config + kubeletConfig, err := unmarshalKubeletConfig(content) + if err != nil { + return nil, err + } - if kubeletConfig.FeatureGates == nil { - kubeletConfig.FeatureGates = map[string]bool{} - } - if s.ShouldEnableCSIMigration() { - featureGates, _, fgErr := s.Cluster.CSIMigrationFeatureGates(s.ShouldUnregisterInTreeCloudProvider()) - if fgErr != nil { - return fgErr + if kubeletConfig.FeatureGates == nil { + kubeletConfig.FeatureGates = map[string]bool{} } - for k, v := range featureGates { - kubeletConfig.FeatureGates[k] = v + if s.ShouldEnableCSIMigration() { + featureGates, _, fgErr := s.Cluster.CSIMigrationFeatureGates(s.ShouldUnregisterInTreeCloudProvider()) + if fgErr != nil { + return nil, fgErr + } + for k, v := range featureGates { + kubeletConfig.FeatureGates[k] = v + } } - } - - // Update the config on the node - buf, err = marshalKubeletConfig(kubeletConfig) - if err != nil { - return err - } - - fw, ok := f.(sshiofs.ExtendedFile) - if !ok { - return errors.New("file is not writable") - } - - if err = fw.Truncate(0); err != nil { - return err - } - - if _, err = fw.Seek(0, io.SeekStart); err != nil { - return err - } - - if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil { - return err - } - return nil + return marshalKubeletConfig(kubeletConfig) + }) } func ccmMigrationUpdateKubeletFlags(s *state.State) error { - sshfs := s.Runner.NewFS() - f, err := sshfs.Open(kubeadmEnvFlagsFile) - if err != nil { - return err - } - defer f.Close() - - buf, err := io.ReadAll(f) - if err != nil { - return err - } - - kubeletFlags, err := unmarshalKubeletFlags(buf) - if err != nil { - return err - } - - kubeletFlags["--cloud-provider"] = "external" - delete(kubeletFlags, "--cloud-config") - - buf = marshalKubeletFlags(kubeletFlags) - fw, ok := f.(sshiofs.ExtendedFile) - if !ok { - return errors.New("file is not writable") - } - - if err = fw.Truncate(0); err != nil { - return err - } - - if _, err = fw.Seek(0, io.SeekStart); err != nil { - return err - } + return updateRemoteFile(s, kubeadmEnvFlagsFile, func(content []byte) ([]byte, error) { + kubeletFlags, err := unmarshalKubeletFlags(content) + if err != nil { + return nil, err + } - if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil { - return err - } + kubeletFlags["--cloud-provider"] = "external" + delete(kubeletFlags, "--cloud-config") - return nil + buf := marshalKubeletFlags(kubeletFlags) + return buf, nil + }) } func waitForStaticPodReady(s *state.State, timeout time.Duration, staticPodName, staticPodNamespace string) error { diff --git a/pkg/tasks/common.go b/pkg/tasks/common.go index fc51e224e..fd31beec6 100644 --- a/pkg/tasks/common.go +++ b/pkg/tasks/common.go @@ -19,11 +19,15 @@ package tasks import ( "bytes" "fmt" + "io" "sort" "strings" "github.com/pkg/errors" + "k8c.io/kubeone/pkg/ssh/sshiofs" + "k8c.io/kubeone/pkg/state" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" "sigs.k8s.io/yaml" ) @@ -34,6 +38,44 @@ const ( kubeletConfigFile = "/var/lib/kubelet/config.yaml" ) +func updateRemoteFile(s *state.State, filePath string, modifier func(content []byte) ([]byte, error)) error { + sshfs := s.Runner.NewFS() + f, err := sshfs.Open(filePath) + if err != nil { + return err + } + defer f.Close() + + buf, err := io.ReadAll(f) + if err != nil { + return err + } + + buf, err = modifier(buf) + if err != nil { + return err + } + + fw, ok := f.(sshiofs.ExtendedFile) + if !ok { + return errors.New("file is not writable") + } + + if err = fw.Truncate(0); err != nil { + return err + } + + if _, err = fw.Seek(0, io.SeekStart); err != nil { + return err + } + + if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil { + return err + } + + return nil +} + func unmarshalKubeletFlags(buf []byte) (map[string]string, error) { // throw away KUBELET_KUBEADM_ARGS= s1 := strings.SplitN(string(buf), "=", 2) @@ -46,7 +88,7 @@ func unmarshalKubeletFlags(buf []byte) (map[string]string, error) { kubeletflagsMap := map[string]string{} for _, flg := range flagsvalues { - fl := strings.Split(flg, "=") + fl := strings.SplitN(flg, "=", 2) if len(fl) != 2 { return nil, errors.New("wrong split length") } @@ -64,20 +106,7 @@ func marshalKubeletFlags(kubeletflags map[string]string) []byte { sort.Strings(kvpairs) - var buf bytes.Buffer - fmt.Fprintf(&buf, `%s="`, kubeletKubeadmArgsEnv) - - for i, val := range kvpairs { - format := "%s " - if i == len(kvpairs)-1 { - format = "%s" - } - fmt.Fprintf(&buf, format, val) - } - - buf.WriteString(`"`) - - return buf.Bytes() + return []byte(fmt.Sprintf(`%s="%s"`, kubeletKubeadmArgsEnv, strings.Join(kvpairs, " "))) } func unmarshalKubeletConfig(configBytes []byte) (*kubeletconfigv1beta1.KubeletConfiguration, error) { diff --git a/pkg/tasks/containerd_test.go b/pkg/tasks/common_test.go similarity index 90% rename from pkg/tasks/containerd_test.go rename to pkg/tasks/common_test.go index c2687b9f5..889ced09b 100644 --- a/pkg/tasks/containerd_test.go +++ b/pkg/tasks/common_test.go @@ -67,6 +67,15 @@ func Test_unmarshalKubeletFlags(t *testing.T) { }, wantErr: false, }, + { + name: "key-values in a flag", + buf: []byte(`KUBELET_KUBEADM_ARGS="--key1=val1=test1,val2=test2 --key2=val2"`), + want: map[string]string{ + "--key1": "val1=test1,val2=test2", + "--key2": "val2", + }, + wantErr: false, + }, { name: "error1", buf: []byte{}, diff --git a/pkg/tasks/containerd.go b/pkg/tasks/containerd.go index 6f34bfa4f..623d1b822 100644 --- a/pkg/tasks/containerd.go +++ b/pkg/tasks/containerd.go @@ -17,15 +17,12 @@ limitations under the License. package tasks import ( - "bytes" "errors" - "io" "time" "k8c.io/kubeone/pkg/apis/kubeone" "k8c.io/kubeone/pkg/scripts" "k8c.io/kubeone/pkg/ssh" - "k8c.io/kubeone/pkg/ssh/sshiofs" "k8c.io/kubeone/pkg/state" corev1 "k8s.io/api/core/v1" @@ -86,45 +83,23 @@ func migrateToContainerd(s *state.State) error { func migrateToContainerdTask(s *state.State, node *kubeone.HostConfig, conn ssh.Connection) error { s.Logger.Info("Migrating container runtime to containerd") - sshfs := s.Runner.NewFS() - f, err := sshfs.Open(kubeadmEnvFlagsFile) - if err != nil { - return err - } - defer f.Close() + err := updateRemoteFile(s, kubeadmEnvFlagsFile, func(content []byte) ([]byte, error) { + kubeletFlags, err := unmarshalKubeletFlags(content) + if err != nil { + return nil, err + } - buf, err := io.ReadAll(f) - if err != nil { - return err - } + for k, v := range containerdKubeletFlags { + kubeletFlags[k] = v + } - kubeletFlags, err := unmarshalKubeletFlags(buf) + buf := marshalKubeletFlags(kubeletFlags) + return buf, nil + }) if err != nil { return err } - for k, v := range containerdKubeletFlags { - kubeletFlags[k] = v - } - - buf = marshalKubeletFlags(kubeletFlags) - fw, ok := f.(sshiofs.ExtendedFile) - if !ok { - return errors.New("file is not writable") - } - - if err = fw.Truncate(0); err != nil { - return err - } - - if _, err = fw.Seek(0, io.SeekStart); err != nil { - return err - } - - if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil { - return err - } - generateContainerdConfig := node.OperatingSystem != kubeone.OperatingSystemNameFlatcar migrateScript, err := scripts.MigrateToContainerd(s.Cluster.RegistryConfiguration.InsecureRegistryAddress(), generateContainerdConfig) if err != nil {