Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Marko Mudrinić <[email protected]>
  • Loading branch information
xmudrii committed Sep 28, 2021
1 parent c07d3bb commit ff78b0e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 143 deletions.
120 changes: 28 additions & 92 deletions pkg/tasks/ccm_csi_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ limitations under the License.
package tasks

import (
"bytes"
"fmt"
"io"
"strconv"
"time"

Expand All @@ -29,7 +27,6 @@ import (
"k8c.io/kubeone/pkg/nodeutils"
"k8c.io/kubeone/pkg/scripts"
"k8c.io/kubeone/pkg/ssh"
"k8c.io/kubeone/pkg/ssh/sshiofs"
"k8c.io/kubeone/pkg/state"

"github.com/kubermatic/machine-controller/pkg/apis/cluster/common"
Expand Down Expand Up @@ -234,104 +231,43 @@ func ccmMigrationUpdateStaticWorkersKubeletConfigInternal(s *state.State, node *
}

func ccmMigrationUpdateKubeletConfigFile(s *state.State) error {
// Grab the Kubelet configuration file from the node
sshfs := s.Runner.NewFS()
f, err := sshfs.Open(kubeletConfigFile)
if err != nil {
return err
}
defer f.Close()

buf, err := io.ReadAll(f)
if err != nil {
return err
}

// Unmarshal and update the config
kubeletConfig, err := unmarshalKubeletConfig(buf)
if err != nil {
return err
}
return updateRemoteFile(s, kubeletConfigFile, func(content []byte) ([]byte, error) {
// Unmarshal and update the config
kubeletConfig, err := unmarshalKubeletConfig(content)
if err != nil {
return nil, err
}

if kubeletConfig.FeatureGates == nil {
kubeletConfig.FeatureGates = map[string]bool{}
}
if s.ShouldEnableCSIMigration() {
featureGates, _, fgErr := s.Cluster.CSIMigrationFeatureGates(s.ShouldUnregisterInTreeCloudProvider())
if fgErr != nil {
return fgErr
if kubeletConfig.FeatureGates == nil {
kubeletConfig.FeatureGates = map[string]bool{}
}
for k, v := range featureGates {
kubeletConfig.FeatureGates[k] = v
if s.ShouldEnableCSIMigration() {
featureGates, _, fgErr := s.Cluster.CSIMigrationFeatureGates(s.ShouldUnregisterInTreeCloudProvider())
if fgErr != nil {
return nil, fgErr
}
for k, v := range featureGates {
kubeletConfig.FeatureGates[k] = v
}
}
}

// Update the config on the node
buf, err = marshalKubeletConfig(kubeletConfig)
if err != nil {
return err
}

fw, ok := f.(sshiofs.ExtendedFile)
if !ok {
return errors.New("file is not writable")
}

if err = fw.Truncate(0); err != nil {
return err
}

if _, err = fw.Seek(0, io.SeekStart); err != nil {
return err
}

if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil {
return err
}

return nil
return marshalKubeletConfig(kubeletConfig)
})
}

func ccmMigrationUpdateKubeletFlags(s *state.State) error {
sshfs := s.Runner.NewFS()
f, err := sshfs.Open(kubeadmEnvFlagsFile)
if err != nil {
return err
}
defer f.Close()

buf, err := io.ReadAll(f)
if err != nil {
return err
}

kubeletFlags, err := unmarshalKubeletFlags(buf)
if err != nil {
return err
}

kubeletFlags["--cloud-provider"] = "external"
delete(kubeletFlags, "--cloud-config")

buf = marshalKubeletFlags(kubeletFlags)
fw, ok := f.(sshiofs.ExtendedFile)
if !ok {
return errors.New("file is not writable")
}

if err = fw.Truncate(0); err != nil {
return err
}

if _, err = fw.Seek(0, io.SeekStart); err != nil {
return err
}
return updateRemoteFile(s, kubeadmEnvFlagsFile, func(content []byte) ([]byte, error) {
kubeletFlags, err := unmarshalKubeletFlags(content)
if err != nil {
return nil, err
}

if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil {
return err
}
kubeletFlags["--cloud-provider"] = "external"
delete(kubeletFlags, "--cloud-config")

return nil
buf := marshalKubeletFlags(kubeletFlags)
return buf, nil
})
}

func waitForStaticPodReady(s *state.State, timeout time.Duration, staticPodName, staticPodNamespace string) error {
Expand Down
59 changes: 44 additions & 15 deletions pkg/tasks/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ package tasks
import (
"bytes"
"fmt"
"io"
"sort"
"strings"

"github.com/pkg/errors"

"k8c.io/kubeone/pkg/ssh/sshiofs"
"k8c.io/kubeone/pkg/state"

kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"sigs.k8s.io/yaml"
)
Expand All @@ -34,6 +38,44 @@ const (
kubeletConfigFile = "/var/lib/kubelet/config.yaml"
)

func updateRemoteFile(s *state.State, filePath string, modifier func(content []byte) ([]byte, error)) error {
sshfs := s.Runner.NewFS()
f, err := sshfs.Open(filePath)
if err != nil {
return err
}
defer f.Close()

buf, err := io.ReadAll(f)
if err != nil {
return err
}

buf, err = modifier(buf)
if err != nil {
return err
}

fw, ok := f.(sshiofs.ExtendedFile)
if !ok {
return errors.New("file is not writable")
}

if err = fw.Truncate(0); err != nil {
return err
}

if _, err = fw.Seek(0, io.SeekStart); err != nil {
return err
}

if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil {
return err
}

return nil
}

func unmarshalKubeletFlags(buf []byte) (map[string]string, error) {
// throw away KUBELET_KUBEADM_ARGS=
s1 := strings.SplitN(string(buf), "=", 2)
Expand All @@ -46,7 +88,7 @@ func unmarshalKubeletFlags(buf []byte) (map[string]string, error) {
kubeletflagsMap := map[string]string{}

for _, flg := range flagsvalues {
fl := strings.Split(flg, "=")
fl := strings.SplitN(flg, "=", 2)
if len(fl) != 2 {
return nil, errors.New("wrong split length")
}
Expand All @@ -64,20 +106,7 @@ func marshalKubeletFlags(kubeletflags map[string]string) []byte {

sort.Strings(kvpairs)

var buf bytes.Buffer
fmt.Fprintf(&buf, `%s="`, kubeletKubeadmArgsEnv)

for i, val := range kvpairs {
format := "%s "
if i == len(kvpairs)-1 {
format = "%s"
}
fmt.Fprintf(&buf, format, val)
}

buf.WriteString(`"`)

return buf.Bytes()
return []byte(fmt.Sprintf(`%s="%s"`, kubeletKubeadmArgsEnv, strings.Join(kvpairs, " ")))
}

func unmarshalKubeletConfig(configBytes []byte) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/tasks/containerd_test.go → pkg/tasks/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ func Test_unmarshalKubeletFlags(t *testing.T) {
},
wantErr: false,
},
{
name: "key-values in a flag",
buf: []byte(`KUBELET_KUBEADM_ARGS="--key1=val1=test1,val2=test2 --key2=val2"`),
want: map[string]string{
"--key1": "val1=test1,val2=test2",
"--key2": "val2",
},
wantErr: false,
},
{
name: "error1",
buf: []byte{},
Expand Down
47 changes: 11 additions & 36 deletions pkg/tasks/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@ limitations under the License.
package tasks

import (
"bytes"
"errors"
"io"
"time"

"k8c.io/kubeone/pkg/apis/kubeone"
"k8c.io/kubeone/pkg/scripts"
"k8c.io/kubeone/pkg/ssh"
"k8c.io/kubeone/pkg/ssh/sshiofs"
"k8c.io/kubeone/pkg/state"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -86,45 +83,23 @@ func migrateToContainerd(s *state.State) error {
func migrateToContainerdTask(s *state.State, node *kubeone.HostConfig, conn ssh.Connection) error {
s.Logger.Info("Migrating container runtime to containerd")

sshfs := s.Runner.NewFS()
f, err := sshfs.Open(kubeadmEnvFlagsFile)
if err != nil {
return err
}
defer f.Close()
err := updateRemoteFile(s, kubeadmEnvFlagsFile, func(content []byte) ([]byte, error) {
kubeletFlags, err := unmarshalKubeletFlags(content)
if err != nil {
return nil, err
}

buf, err := io.ReadAll(f)
if err != nil {
return err
}
for k, v := range containerdKubeletFlags {
kubeletFlags[k] = v
}

kubeletFlags, err := unmarshalKubeletFlags(buf)
buf := marshalKubeletFlags(kubeletFlags)
return buf, nil
})
if err != nil {
return err
}

for k, v := range containerdKubeletFlags {
kubeletFlags[k] = v
}

buf = marshalKubeletFlags(kubeletFlags)
fw, ok := f.(sshiofs.ExtendedFile)
if !ok {
return errors.New("file is not writable")
}

if err = fw.Truncate(0); err != nil {
return err
}

if _, err = fw.Seek(0, io.SeekStart); err != nil {
return err
}

if _, err = io.Copy(fw, bytes.NewBuffer(buf)); err != nil {
return err
}

generateContainerdConfig := node.OperatingSystem != kubeone.OperatingSystemNameFlatcar
migrateScript, err := scripts.MigrateToContainerd(s.Cluster.RegistryConfiguration.InsecureRegistryAddress(), generateContainerdConfig)
if err != nil {
Expand Down

0 comments on commit ff78b0e

Please sign in to comment.