From 1b7266ae938284b0707294f08943868fd056dd20 Mon Sep 17 00:00:00 2001 From: Maciej Zimnoch Date: Fri, 19 Feb 2021 15:07:23 +0100 Subject: [PATCH] cluster: automated sidecar upgrade (#187) Operator sets actual sidecar version in ScyllaCluster status. If it's different than Operator version, sidecar container is updated. Fixes #187 --- .../cluster/actions/rack_synchronized.go | 97 +++++++ .../cluster/actions/rack_synchronized_test.go | 250 ++++++++++++++++++ .../cluster/actions/sidecar_upgrade.go | 62 +++++ .../cluster/actions/sidecar_upgrade_test.go | 78 ++++++ .../cluster/actions/upgrade_version.go | 13 +- pkg/controllers/cluster/resource/resource.go | 2 +- pkg/controllers/cluster/status.go | 15 +- pkg/controllers/cluster/sync.go | 72 +++-- pkg/controllers/cluster/sync_test.go | 44 ++- pkg/controllers/cluster/util/util.go | 13 + pkg/naming/constants.go | 3 +- pkg/naming/names.go | 34 ++- pkg/test/unit/helpers.go | 81 ++++++ test/integration/main_test.go | 2 +- test/integration/upgrade_version_test.go | 2 +- 15 files changed, 721 insertions(+), 47 deletions(-) create mode 100644 pkg/controllers/cluster/actions/rack_synchronized.go create mode 100644 pkg/controllers/cluster/actions/rack_synchronized_test.go create mode 100644 pkg/controllers/cluster/actions/sidecar_upgrade.go create mode 100644 pkg/controllers/cluster/actions/sidecar_upgrade_test.go diff --git a/pkg/controllers/cluster/actions/rack_synchronized.go b/pkg/controllers/cluster/actions/rack_synchronized.go new file mode 100644 index 00000000000..9c7ac15ae3a --- /dev/null +++ b/pkg/controllers/cluster/actions/rack_synchronized.go @@ -0,0 +1,97 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/scylladb/go-log" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" + "github.com/scylladb/scylla-operator/pkg/naming" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type rackSynchronizedSubAction interface { + // RackUpdated should return whether requested update is already applied to a rack sts. + RackUpdated(sts *appsv1.StatefulSet) (bool, error) + // PodUpdated should return whether requested update is already applied to rack pod. + PodUpdated(pod *corev1.Pod) (bool, error) + // Update performs desired update. + Update(sts *appsv1.StatefulSet) error + // Name of action. + Name() string +} + +// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner. +// Racks are upgraded in the same order as in ScyllaCluster Spec. +// Single Execute call performs at most a single update. +type rackSynchronizedAction struct { + subAction rackSynchronizedSubAction + cluster *scyllav1.ScyllaCluster + logger log.Logger +} + +const ( + RackSynchronizedActionPrefix = "rack-synchronized-" +) + +func (a rackSynchronizedAction) Name() string { + return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name()) +} + +func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error { + for _, rack := range a.cluster.Spec.Datacenter.Racks { + sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient) + if err != nil { + return errors.Wrap(err, "get rack statefulset") + } + + rackUpdated, err := a.subAction.RackUpdated(sts) + if err != nil { + return errors.Wrap(err, "determine if rack needs update") + } + if !rackUpdated { + upgradedSts := sts.DeepCopy() + if err := a.subAction.Update(upgradedSts); err != nil { + return errors.Wrap(err, "update rack") + } + + // Patch rack sts and exit + a.logger.Info(ctx, "Patching rack definition", "rack", rack.Name) + if err := util.PatchStatefulSet(ctx, sts, upgradedSts, s.kubeclient); err != nil { + return errors.Wrap(err, "patch statefulset") + } + + return nil + } + + a.logger.Info(ctx, "Rack already upgraded, checking Pods", "rack", rack.Name) + + pods, err := s.kubeclient.CoreV1().Pods(a.cluster.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: naming.RackSelector(rack, a.cluster).String(), + }) + if err != nil { + return errors.Wrap(err, "get pods") + } + + for _, pod := range pods.Items { + podUpdated, err := a.subAction.PodUpdated(&pod) + if err != nil { + return errors.Wrap(err, "check if pod is updated") + } + if !podUpdated || !podReady(&pod) { + a.logger.Info(ctx, "Rack pod is not updated, awaiting readiness", "rack", rack.Name, "pod", pod.Name, "pod_ready", podReady(&pod), "pod_updated", podUpdated) + return nil + } + a.logger.Debug(ctx, "Rack Pod is up to date", "rack", rack.Name, "pod", pod.Name) + } + a.logger.Info(ctx, "Rack updated", "rack", rack.Name) + } + + return nil +} diff --git a/pkg/controllers/cluster/actions/rack_synchronized_test.go b/pkg/controllers/cluster/actions/rack_synchronized_test.go new file mode 100644 index 00000000000..ad97d30d6f7 --- /dev/null +++ b/pkg/controllers/cluster/actions/rack_synchronized_test.go @@ -0,0 +1,250 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "context" + "reflect" + "testing" + + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" + "github.com/scylladb/scylla-operator/pkg/test/unit" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +type subAction struct { + updateState map[string]bool + updates []string + updateFunc func(sts *appsv1.StatefulSet) error +} + +func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { + return a.updateState[sts.Name], nil +} + +func (a *subAction) PodUpdated(pod *corev1.Pod) (bool, error) { + return a.updateState[pod.Name], nil +} + +func (a *subAction) Update(sts *appsv1.StatefulSet) error { + a.updates = append(a.updates, sts.Name) + if a.updateFunc != nil { + return a.updateFunc(sts) + } + return nil +} + +func (a subAction) Name() string { + return "fake-sub-action" +} + +func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) { + t.Parallel() + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(1) + firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") + + objects := []runtime.Object{firstRack} + kubeClient := fake.NewSimpleClientset(objects...) + + sa := &subAction{ + updateFunc: func(sts *appsv1.StatefulSet) error { + sts.Generation += 1 + return nil + }, + updateState: map[string]bool{ + firstRack.Name: false, + }, + } + a := rackSynchronizedAction{ + subAction: sa, + cluster: cluster, + logger: logger, + } + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + expectedUpdates := []string{firstRack.Name} + if !reflect.DeepEqual(sa.updates, expectedUpdates) { + t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates) + } + + sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Expected non nil err, got %s", err) + } + + if sts.Generation == firstRack.Generation { + t.Error("Expected sts update") + } +} + +func TestRackSynchronizedAction_WaitUntilAllRackPodsEntersReadyState(t *testing.T) { + t.Parallel() + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(2, 1) + firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") + secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image") + + firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod) + firstRackSecondPod := unit.PodForRack(1, cluster.Spec.Datacenter.Racks[0], cluster) + secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod) + + objects := []runtime.Object{ + firstRack, secondRack, + firstRackPod, firstRackSecondPod, secondRackPod, + } + + kubeClient := fake.NewSimpleClientset(objects...) + + sa := &subAction{ + updateState: map[string]bool{ + firstRack.Name: true, + firstRackPod.Name: true, + firstRackSecondPod.Name: true, + secondRack.Name: false, + secondRackPod.Name: false, + }, + } + a := rackSynchronizedAction{ + subAction: sa, + cluster: cluster, + logger: logger, + } + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + + expectedUpdates := []string(nil) + if !reflect.DeepEqual(sa.updates, expectedUpdates) { + t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates) + } +} + +func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) { + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(1, 1) + firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") + secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image") + + firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod) + secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod) + + objects := []runtime.Object{ + firstRack, secondRack, + firstRackPod, secondRackPod, + } + kubeClient := fake.NewSimpleClientset(objects...) + + ts := []struct { + Name string + State map[string]bool + ExpectedUpdates []string + }{ + { + Name: "nothing updated", + State: map[string]bool{ + firstRack.Name: false, + secondRack.Name: false, + firstRackPod.Name: false, + secondRackPod.Name: false, + }, + ExpectedUpdates: []string{firstRack.Name}, + }, + { + Name: "rack updated, pod not yet", + State: map[string]bool{ + firstRack.Name: true, + secondRack.Name: false, + firstRackPod.Name: false, + secondRackPod.Name: false, + }, + ExpectedUpdates: nil, + }, + { + Name: "first rack updated", + State: map[string]bool{ + firstRack.Name: true, + secondRack.Name: false, + firstRackPod.Name: true, + secondRackPod.Name: false, + }, + ExpectedUpdates: []string{secondRack.Name}, + }, + { + Name: "second rack updated, pod not yet", + State: map[string]bool{ + firstRack.Name: true, + secondRack.Name: true, + firstRackPod.Name: true, + secondRackPod.Name: false, + }, + ExpectedUpdates: nil, + }, + { + Name: "all racks updated", + State: map[string]bool{ + firstRack.Name: true, + secondRack.Name: true, + firstRackPod.Name: true, + secondRackPod.Name: true, + }, + ExpectedUpdates: nil, + }, + { + Name: "second rack updated, first not", + State: map[string]bool{ + firstRack.Name: false, + secondRack.Name: true, + firstRackPod.Name: false, + secondRackPod.Name: true, + }, + ExpectedUpdates: []string{firstRack.Name}, + }, + } + + for i := range ts { + test := ts[i] + t.Run(test.Name, func(t *testing.T) { + t.Parallel() + + sa := &subAction{updateState: test.State} + a := rackSynchronizedAction{ + subAction: sa, + cluster: cluster, + logger: logger, + } + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) { + t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates) + } + }) + } +} diff --git a/pkg/controllers/cluster/actions/sidecar_upgrade.go b/pkg/controllers/cluster/actions/sidecar_upgrade.go new file mode 100644 index 00000000000..d5e395973d3 --- /dev/null +++ b/pkg/controllers/cluster/actions/sidecar_upgrade.go @@ -0,0 +1,62 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "github.com/pkg/errors" + "github.com/scylladb/go-log" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" + "github.com/scylladb/scylla-operator/pkg/naming" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +const ( + SidecarVersionUpgradeAction = "sidecar-upgrade" +) + +type SidecarUpgrade struct { + sidecar corev1.Container +} + +func (a *SidecarUpgrade) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { + return a.containerImageUpdated(sts.Spec.Template.Spec.InitContainers) +} + +func (a *SidecarUpgrade) PodUpdated(pod *corev1.Pod) (bool, error) { + return a.containerImageUpdated(pod.Spec.InitContainers) +} + +func (a *SidecarUpgrade) containerImageUpdated(containers []corev1.Container) (bool, error) { + sidecarIdx, err := naming.FindSidecarInjectorContainer(containers) + if err != nil { + return false, errors.Wrap(err, "find sidecar container in pod") + } + + return containers[sidecarIdx].Image == a.sidecar.Image, nil +} + +func (a *SidecarUpgrade) Update(sts *appsv1.StatefulSet) error { + initContainers := sts.Spec.Template.Spec.InitContainers + sidecarIdx, err := naming.FindSidecarInjectorContainer(initContainers) + if err != nil { + return errors.Wrap(err, "find sidecar container in existing sts") + } + + sts.Spec.Template.Spec.InitContainers[sidecarIdx] = a.sidecar + return nil +} + +func NewSidecarUpgrade(c *scyllav1.ScyllaCluster, sidecar corev1.Container, l log.Logger) *rackSynchronizedAction { + return &rackSynchronizedAction{ + subAction: &SidecarUpgrade{ + sidecar: sidecar, + }, + cluster: c, + logger: l, + } +} + +func (a *SidecarUpgrade) Name() string { + return SidecarVersionUpgradeAction +} diff --git a/pkg/controllers/cluster/actions/sidecar_upgrade_test.go b/pkg/controllers/cluster/actions/sidecar_upgrade_test.go new file mode 100644 index 00000000000..dfd5cd6a10c --- /dev/null +++ b/pkg/controllers/cluster/actions/sidecar_upgrade_test.go @@ -0,0 +1,78 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "context" + "reflect" + "testing" + + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" + "github.com/scylladb/scylla-operator/pkg/naming" + "github.com/scylladb/scylla-operator/pkg/test/unit" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestSidecarUpgradeAction(t *testing.T) { + const ( + preUpdateImage = "sidecar:old" + postUpdateImage = "sidecar:new" + ) + var ( + preUpdateCommand = []string{"false"} + postUpdateCommand = []string{"true"} + ) + + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(1) + rack := cluster.Spec.Datacenter.Racks[0] + rackSts := resource.StatefulSetForRack(rack, cluster, preUpdateImage) + + oldSidecar := corev1.Container{ + Name: naming.SidecarInjectorContainerName, + Image: preUpdateImage, + Command: preUpdateCommand, + } + + rackPod := unit.PodForRack(0, rack, cluster, unit.ReadyPod, unit.WithInitContainer(oldSidecar)) + + updatedContainer := corev1.Container{ + Name: naming.SidecarInjectorContainerName, + Image: postUpdateImage, + Command: postUpdateCommand, + } + + objects := []runtime.Object{rackSts, rackPod} + kubeClient := fake.NewSimpleClientset(objects...) + + a := NewSidecarUpgrade(cluster, updatedContainer, logger) + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + + sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, rackSts.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected non nil err, got %s", err) + } + + idx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers) + if err != nil { + t.Fatalf("Expected non nil err, got %s", err) + } + + if !reflect.DeepEqual(sts.Spec.Template.Spec.InitContainers[idx], updatedContainer) { + t.Fatalf("Expected containers to be equal, got %+v, expected %+v", sts.Spec.Template.Spec.InitContainers[idx], updatedContainer) + } +} diff --git a/pkg/controllers/cluster/actions/upgrade_version.go b/pkg/controllers/cluster/actions/upgrade_version.go index 2c005bbd406..7f648001b05 100644 --- a/pkg/controllers/cluster/actions/upgrade_version.go +++ b/pkg/controllers/cluster/actions/upgrade_version.go @@ -136,6 +136,7 @@ func (a *ClusterVersionUpgrade) upgradeProcedure(ctx context.Context) upgradePro return genericUpgradeProcedure } +// TODO:(zimnx) Refactor patch version upgrade into rack_synchronized action. func (a *ClusterVersionUpgrade) Execute(ctx context.Context, s *State) error { a.cc = s.Client a.kubeClient = s.kubeclient @@ -193,7 +194,7 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error { return errors.Wrap(err, "get statefulset") } - scyllaVersion, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers) + scyllaVersion, err := naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return errors.Wrap(err, "get scylla container version") } @@ -207,7 +208,7 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error { } for _, p := range pods.Items { - scyllaVersion, err := naming.ScyllaImage(p.Spec.Containers) + scyllaVersion, err := naming.ScyllaVersion(p.Spec.Containers) if err != nil { return errors.Wrap(err, "get scylla container version") } @@ -228,7 +229,7 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error { return errors.Wrap(err, "get statefulset") } - scyllaVersion, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers) + scyllaVersion, err := naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return errors.Wrap(err, "get scylla container version") } @@ -622,7 +623,7 @@ func (a *ClusterVersionUpgrade) nextRack(ctx context.Context) (*scyllav1.RackSpe } for _, p := range pods.Items { - containerVersion, err := naming.ScyllaImage(p.Spec.Containers) + containerVersion, err := naming.ScyllaVersion(p.Spec.Containers) if err != nil { return nil, errors.Wrap(err, "get scylla container version") } @@ -715,7 +716,7 @@ func (a *ClusterVersionUpgrade) nextNode(ctx context.Context) (*corev1.Pod, erro }) for _, p := range pods.Items { - containerVersion, err := naming.ScyllaImage(p.Spec.Containers) + containerVersion, err := naming.ScyllaVersion(p.Spec.Containers) if err != nil { return nil, errors.Wrap(err, "get scylla container image version") } @@ -794,7 +795,7 @@ func (a *ClusterVersionUpgrade) nodeUpgradedConditionFunc(ctx context.Context) f return false, errors.Wrap(err, "refresh pod") } - ver, err := naming.ScyllaImage(node.Spec.Containers) + ver, err := naming.ScyllaVersion(node.Spec.Containers) if err != nil { return false, errors.Wrap(err, "get scylla container image") } diff --git a/pkg/controllers/cluster/resource/resource.go b/pkg/controllers/cluster/resource/resource.go index 75c6205d8d1..50f73b93691 100644 --- a/pkg/controllers/cluster/resource/resource.go +++ b/pkg/controllers/cluster/resource/resource.go @@ -190,7 +190,7 @@ func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, sidecarI Tolerations: placement.Tolerations, InitContainers: []corev1.Container{ { - Name: "sidecar-injection", + Name: naming.SidecarInjectorContainerName, Image: sidecarImage, ImagePullPolicy: "IfNotPresent", Command: []string{ diff --git a/pkg/controllers/cluster/status.go b/pkg/controllers/cluster/status.go index 42d84f4ee52..cf06e0d38c2 100644 --- a/pkg/controllers/cluster/status.go +++ b/pkg/controllers/cluster/status.go @@ -81,25 +81,14 @@ func (cc *ClusterReconciler) updateStatus(ctx context.Context, cluster *scyllav1 if err != nil { return errors.WithStack(err) } - idx, err := naming.FindScyllaContainer(sts.Spec.Template.Spec.Containers) - if err != nil { - return errors.WithStack(err) - } - rackStatus.Version, err = naming.ImageToVersion(sts.Spec.Template.Spec.Containers[idx].Image) + rackStatus.Version, err = naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return errors.WithStack(err) } } // Update Upgrading condition - idx, err := naming.FindScyllaContainer(sts.Spec.Template.Spec.Containers) - if err != nil { - return errors.WithStack(err) - } - desiredRackVersion, err := naming.ImageToVersion(sts.Spec.Template.Spec.Containers[idx].Image) - if err != nil { - return errors.WithStack(err) - } + desiredRackVersion := cluster.Spec.Version actualRackVersion := rackStatus.Version if desiredRackVersion != actualRackVersion { cc.Logger.Info(ctx, "Rack should be upgraded", "actual_version", actualRackVersion, "desired_version", desiredRackVersion, "rack", rack.Name) diff --git a/pkg/controllers/cluster/sync.go b/pkg/controllers/cluster/sync.go index f4969c2e34d..0a53d4b4dd8 100644 --- a/pkg/controllers/cluster/sync.go +++ b/pkg/controllers/cluster/sync.go @@ -8,6 +8,7 @@ import ( "github.com/scylladb/go-log" scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/actions" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" "github.com/scylladb/scylla-operator/pkg/naming" corev1 "k8s.io/api/core/v1" @@ -66,21 +67,22 @@ func (cc *ClusterReconciler) sync(c *scyllav1.ScyllaCluster) error { } // Calculate and execute next action - if act := cc.nextAction(ctx, c); act != nil { + if act, err := cc.nextAction(ctx, c); err != nil { + cc.Recorder.Event(c, corev1.EventTypeWarning, naming.ErrSyncFailed, fmt.Sprintf(MessageUpdateStatusFailed, err)) + return errors.Wrap(err, "failed to determine next action") + } else if act != nil { s := actions.NewState(cc.Client, cc.KubeClient, cc.Recorder) logger.Debug(ctx, "New action", "name", act.Name()) - err = act.Execute(ctx, s) - } - - if err != nil { - cc.Recorder.Event(c, corev1.EventTypeWarning, naming.ErrSyncFailed, fmt.Sprintf(MessageClusterSyncFailed, errors.Cause(err))) - return err + if err := act.Execute(ctx, s); err != nil { + cc.Recorder.Event(c, corev1.EventTypeWarning, naming.ErrSyncFailed, fmt.Sprintf(MessageClusterSyncFailed, errors.Cause(err))) + return err + } } return nil } -func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.ScyllaCluster) actions.Action { +func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.ScyllaCluster) (actions.Action, error) { logger := cc.Logger.With("cluster", cluster.Namespace+"/"+cluster.Name, "resourceVersion", cluster.ResourceVersion) // Check if any rack isn't created @@ -88,7 +90,7 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S // For each rack, check if a status entry exists if _, ok := cluster.Status.Racks[rack.Name]; !ok { logger.Info(ctx, "Next Action: Create rack", "name", rack.Name) - return actions.NewRackCreateAction(rack, cluster, cc.OperatorImage) + return actions.NewRackCreateAction(rack, cluster, cc.OperatorImage), nil } } @@ -102,13 +104,13 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S if scyllav1.IsRackConditionTrue(&rackStatus, scyllav1.RackConditionTypeMemberReplacing) { // Perform node replace logger.Info(ctx, "Next Action: Node replace rack", "name", rack.Name) - return actions.NewRackReplaceNodeAction(rack, cluster, logger.Named("replace")) + return actions.NewRackReplaceNodeAction(rack, cluster, logger.Named("replace")), nil } if scyllav1.IsRackConditionTrue(&rackStatus, scyllav1.RackConditionTypeMemberLeaving) { // Resume scale down logger.Info(ctx, "Next Action: Scale-Down rack", "name", rack.Name) - return actions.NewRackScaleDownAction(rack, cluster) + return actions.NewRackScaleDownAction(rack, cluster), nil } } @@ -117,7 +119,18 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S rackStatus := cluster.Status.Racks[rack.Name] if cluster.Status.Upgrade != nil || scyllav1.IsRackConditionTrue(&rackStatus, scyllav1.RackConditionTypeUpgrading) { - return actions.NewClusterVersionUpgradeAction(cluster, logger) + return actions.NewClusterVersionUpgradeAction(cluster, logger), nil + } + } + + // Check if there is an sidecar upgrade in progress + for _, rack := range cluster.Spec.Datacenter.Racks { + update, container, err := cc.sidecarUpdateNeeded(ctx, rack, cluster) + if err != nil { + return nil, errors.Wrap(err, "check if sidecar update is needed") + } + if update { + return actions.NewSidecarUpgrade(cluster, container, logger), nil } } @@ -129,7 +142,7 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S for _, rack := range cluster.Spec.Datacenter.Racks { if rack.Members < cluster.Status.Racks[rack.Name].Members { logger.Info(ctx, "Next Action: Scale-Down rack", "name", rack.Name) - return actions.NewRackScaleDownAction(rack, cluster) + return actions.NewRackScaleDownAction(rack, cluster), nil } } @@ -137,7 +150,7 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S for _, rack := range cluster.Spec.Datacenter.Racks { if rack.Members > cluster.Status.Racks[rack.Name].Members { logger.Info(ctx, "Next Action: Scale-Up rack", "name", rack.Name) - return actions.NewRackScaleUpAction(rack, cluster) + return actions.NewRackScaleUpAction(rack, cluster), nil } } @@ -145,10 +158,37 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S for _, rack := range cluster.Spec.Datacenter.Racks { if cluster.Spec.Version != cluster.Status.Racks[rack.Name].Version { logger.Info(ctx, "Next Action: Upgrade rack", "name", rack.Name) - return actions.NewClusterVersionUpgradeAction(cluster, logger) + return actions.NewClusterVersionUpgradeAction(cluster, logger), nil } } // Nothing to do - return nil + return nil, nil +} + +func (cc *ClusterReconciler) sidecarUpdateNeeded(ctx context.Context, rack scyllav1.RackSpec, cluster *scyllav1.ScyllaCluster) (bool, corev1.Container, error) { + desiredSts := resource.StatefulSetForRack(rack, cluster, cc.OperatorImage) + desiredIdx, err := naming.FindSidecarInjectorContainer(desiredSts.Spec.Template.Spec.InitContainers) + if err != nil { + return false, corev1.Container{}, errors.Wrap(err, "find sidecar container in desired sts") + } + + actualSts, err := util.GetStatefulSetForRack(ctx, rack, cluster, cc.KubeClient) + if err != nil { + return false, corev1.Container{}, errors.Wrap(err, "fetch rack sts") + } + if actualSts == nil { + return false, corev1.Container{}, errors.Wrap(err, fmt.Sprintf("rack sts %s not found", rack.Name)) + } + + actualIdx, err := naming.FindSidecarInjectorContainer(actualSts.Spec.Template.Spec.InitContainers) + if err != nil { + return false, corev1.Container{}, errors.Wrap(err, "find sidecar container in actual sts") + } + + actualSidecarContainer := actualSts.Spec.Template.Spec.InitContainers[actualIdx] + desiredSidecarContainer := desiredSts.Spec.Template.Spec.InitContainers[desiredIdx] + + return actualSidecarContainer.Image != desiredSidecarContainer.Image, desiredSidecarContainer, nil + } diff --git a/pkg/controllers/cluster/sync_test.go b/pkg/controllers/cluster/sync_test.go index 9820d5f57a9..d458b45f1ab 100644 --- a/pkg/controllers/cluster/sync_test.go +++ b/pkg/controllers/cluster/sync_test.go @@ -2,19 +2,28 @@ package cluster import ( "context" + "fmt" "testing" "github.com/blang/semver" - scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/actions" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" + "github.com/scylladb/scylla-operator/pkg/naming" "github.com/scylladb/scylla-operator/pkg/test/unit" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" ) func TestNextAction(t *testing.T) { + const ( + operatorImage = "scylladb/scylla-operator:latest" + ) + members := int32(3) cluster := unit.NewSingleRackCluster(members) + rack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, operatorImage) clusterNewRackCreate := cluster.DeepCopy() clusterNewRackCreate.Spec.Datacenter.Racks = append( @@ -52,51 +61,78 @@ func TestNextAction(t *testing.T) { scyllav1.SetRackCondition(&testRackStatus, scyllav1.RackConditionTypeUpgrading) clusterResumeVersionUpgrade.Status.Racks["test-rack"] = testRackStatus + clusterSidecarUpgrade := cluster.DeepCopy() + rackWithOldSidecarImage := rack.DeepCopy() + idx, err := naming.FindSidecarInjectorContainer(rackWithOldSidecarImage.Spec.Template.Spec.InitContainers) + if err != nil { + t.Fatal(err) + } + rackWithOldSidecarImage.Spec.Template.Spec.InitContainers[idx].Image = "scylladb/scylla-operator:old" + tests := []struct { name string cluster *scyllav1.ScyllaCluster + objects []runtime.Object expectedAction string expectNoAction bool }{ { name: "create rack", cluster: clusterNewRackCreate, + objects: []runtime.Object{rack}, expectedAction: actions.RackCreateAction, }, { name: "scale up existing rack", cluster: clusterExistingRackScaleUp, + objects: []runtime.Object{rack}, expectedAction: actions.RackScaleUpAction, }, { name: "scale down begin", cluster: clusterBeginRackScaleDown, + objects: []runtime.Object{rack}, expectedAction: actions.RackScaleDownAction, }, { name: "scale down resume", cluster: clusterResumeRackScaleDown, + objects: []runtime.Object{rack}, expectedAction: actions.RackScaleDownAction, }, { name: "patch upgrade begin", cluster: clusterBeginVersionUpgrade, + objects: []runtime.Object{rack}, expectedAction: actions.ClusterVersionUpgradeAction, }, { name: "patch upgrade in-progress", cluster: clusterResumeVersionUpgrade, + objects: []runtime.Object{rack}, expectedAction: actions.ClusterVersionUpgradeAction, }, + { + name: "sidecar upgrade", + cluster: clusterSidecarUpgrade, + objects: []runtime.Object{rackWithOldSidecarImage}, + expectedAction: fmt.Sprintf("%s%s", actions.RackSynchronizedActionPrefix, actions.SidecarVersionUpgradeAction), + }, } - cc := &ClusterReconciler{} - for _, test := range tests { t.Run(test.name, func(t *testing.T) { + cc := &ClusterReconciler{ + KubeClient: fake.NewSimpleClientset(test.objects...), + OperatorImage: operatorImage, + } + // Calculate next action - a := cc.nextAction(context.Background(), test.cluster) + a, err := cc.nextAction(context.Background(), test.cluster) + if err != nil { + t.Errorf("expected non nil err, got %s", err) + } if a == nil { if test.expectNoAction { return diff --git a/pkg/controllers/cluster/util/util.go b/pkg/controllers/cluster/util/util.go index 9ebb787148e..ee1629dca24 100644 --- a/pkg/controllers/cluster/util/util.go +++ b/pkg/controllers/cluster/util/util.go @@ -59,6 +59,19 @@ func GetMemberServicesForRack(ctx context.Context, r scyllav1.RackSpec, c *scyll return svcList.Items, nil } +// GetStatefulSetForRack returns rack underlying StatefulSet. +func GetStatefulSetForRack(ctx context.Context, rack scyllav1.RackSpec, cluster *scyllav1.ScyllaCluster, kubeClient kubernetes.Interface) (*appsv1.StatefulSet, error) { + stsName := naming.StatefulSetNameForRack(rack, cluster) + sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, errors.Wrap(err, "get rack statefulset") + } + return sts, nil +} + // RefFromString is a helper function that takes a string // and outputs a reference to that string. // Useful for initializing a string pointer from a literal. diff --git a/pkg/naming/constants.go b/pkg/naming/constants.go index 60184b1cf25..60baf06f85f 100644 --- a/pkg/naming/constants.go +++ b/pkg/naming/constants.go @@ -64,7 +64,8 @@ const ( // Configuration Values const ( - ScyllaContainerName = "scylla" + ScyllaContainerName = "scylla" + SidecarInjectorContainerName = "sidecar-injection" PVCTemplateName = "data" diff --git a/pkg/naming/names.go b/pkg/naming/names.go index 71cd99a0775..dcffd08befa 100644 --- a/pkg/naming/names.go +++ b/pkg/naming/names.go @@ -81,6 +81,7 @@ func IndexFromName(n string) (int32, error) { return int32(index), nil } +// ImageToVersion strips version part from container image. func ImageToVersion(image string) (string, error) { parts := strings.Split(image, ":") if len(parts) != 2 || len(parts[1]) == 0 { @@ -89,17 +90,28 @@ func ImageToVersion(image string) (string, error) { return parts[1], nil } +// FindScyllaContainer returns Scylla container from given list. func FindScyllaContainer(containers []corev1.Container) (int, error) { + return FindContainerWithName(containers, ScyllaContainerName) +} + +// FindSidecarInjectorContainer returns sidecar injector container from given list. +func FindSidecarInjectorContainer(containers []corev1.Container) (int, error) { + return FindContainerWithName(containers, SidecarInjectorContainerName) +} + +// FindContainerWithName returns container having +func FindContainerWithName(containers []corev1.Container, name string) (int, error) { for idx := range containers { - if containers[idx].Name == ScyllaContainerName { + if containers[idx].Name == name { return idx, nil } } - return -1, errors.New(fmt.Sprintf("Scylla Container '%s' not found", ScyllaContainerName)) + return 0, errors.Errorf(" '%s' container not found", name) } -// ScyllaImage returns version of Scylla container. -func ScyllaImage(containers []corev1.Container) (string, error) { +// ScyllaVersion returns version of Scylla container. +func ScyllaVersion(containers []corev1.Container) (string, error) { idx, err := FindScyllaContainer(containers) if err != nil { return "", errors.Wrap(err, "find scylla container") @@ -111,3 +123,17 @@ func ScyllaImage(containers []corev1.Container) (string, error) { } return version, nil } + +// SidecarVersion returns version of sidecar container. +func SidecarVersion(containers []corev1.Container) (string, error) { + idx, err := FindSidecarInjectorContainer(containers) + if err != nil { + return "", errors.Wrap(err, "find sidecar container") + } + + version, err := ImageToVersion(containers[idx].Image) + if err != nil { + return "", errors.Wrap(err, "parse sidecar container version") + } + return version, nil +} diff --git a/pkg/test/unit/helpers.go b/pkg/test/unit/helpers.go index f270742c5dd..d4ee288f8ae 100644 --- a/pkg/test/unit/helpers.go +++ b/pkg/test/unit/helpers.go @@ -1,15 +1,26 @@ package unit import ( + "fmt" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" + "github.com/scylladb/scylla-operator/pkg/naming" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// NewSingleRackCluster returns ScyllaCluster having single racks. func NewSingleRackCluster(members int32) *scyllav1.ScyllaCluster { return NewDetailedSingleRackCluster("test-cluster", "test-ns", "repo", "2.3.1", "test-dc", "test-rack", members) } +// NewMultiRackCluster returns ScyllaCluster having multiple racks. +func NewMultiRackCluster(members ...int32) *scyllav1.ScyllaCluster { + return NewDetailedMultiRackCluster("test-cluster", "test-ns", "repo", "2.3.1", "test-dc", members...) +} + +// NewDetailedSingleRackCluster returns ScyllaCluster having single rack with supplied information. func NewDetailedSingleRackCluster(name, namespace, repo, version, dc, rack string, members int32) *scyllav1.ScyllaCluster { return &scyllav1.ScyllaCluster{ ObjectMeta: metav1.ObjectMeta{ @@ -43,3 +54,73 @@ func NewDetailedSingleRackCluster(name, namespace, repo, version, dc, rack strin }, } } + +// NewDetailedMultiRackCluster creates multi rack cluster with supplied information. +func NewDetailedMultiRackCluster(name, namespace, repo, version, dc string, members ...int32) *scyllav1.ScyllaCluster { + c := &scyllav1.ScyllaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: scyllav1.ClusterSpec{ + Repository: util.RefFromString(repo), + Version: version, + Datacenter: scyllav1.DatacenterSpec{ + Name: dc, + Racks: []scyllav1.RackSpec{}, + }, + }, + Status: scyllav1.ClusterStatus{ + Racks: map[string]scyllav1.RackStatus{}, + }, + } + + for i, m := range members { + rack := fmt.Sprintf("rack-%d", i) + c.Spec.Datacenter.Racks = append(c.Spec.Datacenter.Racks, scyllav1.RackSpec{ + Name: rack, + Members: m, + Storage: scyllav1.StorageSpec{ + Capacity: "5Gi", + }, + }) + c.Status.Racks[rack] = scyllav1.RackStatus{ + Version: version, + Members: m, + ReadyMembers: m, + } + } + + return c +} + +// PodOption allows to customize Pod. +type PodOption func(*corev1.Pod) + +// WithInitContainer allows to add custom init containers to pod definition. +func WithInitContainer(containers ...corev1.Container) func(p *corev1.Pod) { + return func(p *corev1.Pod) { + p.Spec.InitContainers = append(p.Spec.InitContainers, containers...) + } +} + +// ReadyPod adds PodReady condition. +func ReadyPod(p *corev1.Pod) { + p.Status.Conditions = append(p.Status.Conditions, corev1.PodCondition{Type: corev1.PodReady, Status: corev1.ConditionTrue}) +} + +// PodForRack returns pod definition bound to given rack. +func PodForRack(ordinal int, rack scyllav1.RackSpec, cluster *scyllav1.ScyllaCluster, options ...PodOption) *corev1.Pod { + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", naming.StatefulSetNameForRack(rack, cluster), ordinal), + Namespace: cluster.Namespace, + Labels: naming.RackLabels(rack, cluster), + }, + } + for _, o := range options { + o(p) + } + + return p +} diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 3e8fbff83fb..b1d436b6252 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -63,7 +63,7 @@ func TestMain(m *testing.M) { } }() - options.GetOperatorOptions().Image = "scylladb/scylla-operator" + options.GetOperatorOptions().Image = "scylladb/scylla-operator:latest" defer func() { options.GetOperatorOptions().Image = "" }() diff --git a/test/integration/upgrade_version_test.go b/test/integration/upgrade_version_test.go index 9bd636a8e24..16273737aff 100644 --- a/test/integration/upgrade_version_test.go +++ b/test/integration/upgrade_version_test.go @@ -300,7 +300,7 @@ func scyllaImageInRackStatefulSet(ctx context.Context, rack scyllav1.RackSpec, c return "", err } - ver, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers) + ver, err := naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return "", err }