From e6a68cdfcdaf1fe686de37a0b68a86e1be1e2769 Mon Sep 17 00:00:00 2001 From: Maciej Zimnoch Date: Fri, 19 Feb 2021 15:07:23 +0100 Subject: [PATCH] cluster: automated sidecar upgrade (#187) Operator checks whether version of sidecar in each rack StatefulSet is up to date. If it's different than Operator version, sidecar container is updated. Fixes #187 --- .../cluster/actions/rack_synchronized.go | 91 +++++++++ .../cluster/actions/rack_synchronized_test.go | 193 ++++++++++++++++++ .../cluster/actions/sidecar_upgrade.go | 54 +++++ .../cluster/actions/sidecar_upgrade_test.go | 69 +++++++ .../cluster/actions/upgrade_version.go | 13 +- pkg/controllers/cluster/resource/resource.go | 2 +- pkg/controllers/cluster/status.go | 15 +- pkg/controllers/cluster/sync.go | 71 +++++-- pkg/controllers/cluster/sync_test.go | 44 +++- pkg/controllers/cluster/util/util.go | 10 + pkg/naming/constants.go | 3 +- pkg/naming/names.go | 34 ++- pkg/test/unit/helpers.go | 48 +++++ test/integration/main_test.go | 2 +- test/integration/upgrade_version_test.go | 2 +- 15 files changed, 604 insertions(+), 47 deletions(-) create mode 100644 pkg/controllers/cluster/actions/rack_synchronized.go create mode 100644 pkg/controllers/cluster/actions/rack_synchronized_test.go create mode 100644 pkg/controllers/cluster/actions/sidecar_upgrade.go create mode 100644 pkg/controllers/cluster/actions/sidecar_upgrade_test.go diff --git a/pkg/controllers/cluster/actions/rack_synchronized.go b/pkg/controllers/cluster/actions/rack_synchronized.go new file mode 100644 index 00000000000..44c53674f4c --- /dev/null +++ b/pkg/controllers/cluster/actions/rack_synchronized.go @@ -0,0 +1,91 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/scylladb/go-log" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type rackSynchronizedSubAction interface { + // RackUpdated should return whether requested update is already applied to a rack sts. + RackUpdated(sts *appsv1.StatefulSet) (bool, error) + // Update performs desired update. + Update(sts *appsv1.StatefulSet) error + // Name of action. + Name() string +} + +// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner. +// Racks are upgraded in the same order as in ScyllaCluster Spec. +// Single Execute call performs at most a single update. +type rackSynchronizedAction struct { + subAction rackSynchronizedSubAction + cluster *scyllav1.ScyllaCluster + logger log.Logger +} + +const ( + RackSynchronizedActionPrefix = "rack-synchronized-" +) + +func (a rackSynchronizedAction) Name() string { + return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name()) +} + +func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error { + for _, rack := range a.cluster.Spec.Datacenter.Racks { + sts := &appsv1.StatefulSet{} + sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient) + if err != nil { + return errors.Wrap(err, "get rack statefulset") + } + + rackUpdated, err := a.subAction.RackUpdated(sts) + if err != nil { + return errors.Wrap(err, "determine if rack needs update") + } + if !rackUpdated { + if err := a.subAction.Update(sts); err != nil { + return errors.Wrap(err, "update rack") + } + + a.logger.Info(ctx, "Updating rack definition", "rack", rack.Name) + sts, err = s.kubeclient.AppsV1().StatefulSets(sts.Namespace).Update(ctx, sts, metav1.UpdateOptions{}) + if err != nil { + // Do not raise error in case of a conflict, reconcilation loop will be triggered again + // because new version of STS we are watching is available. + if apierrors.IsConflict(err) { + return nil + } + return err + } + + // Early exit, next rack will be updated once current one becomes ready. + return nil + } + + if !a.stsReady(rack, sts) { + a.logger.Info(ctx, "Rack still upgrading, awaiting readiness", "rack", rack.Name) + return nil + } + + a.logger.Info(ctx, "Rack updated", "rack", rack.Name) + } + + return nil +} + +func (a rackSynchronizedAction) stsReady(rack scyllav1.RackSpec, sts *appsv1.StatefulSet) bool { + return sts.Generation == sts.Status.ObservedGeneration && + sts.Status.ReadyReplicas == rack.Members && + sts.Status.UpdateRevision == sts.Status.CurrentRevision +} diff --git a/pkg/controllers/cluster/actions/rack_synchronized_test.go b/pkg/controllers/cluster/actions/rack_synchronized_test.go new file mode 100644 index 00000000000..0fce3d82411 --- /dev/null +++ b/pkg/controllers/cluster/actions/rack_synchronized_test.go @@ -0,0 +1,193 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "context" + "reflect" + "testing" + + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" + "github.com/scylladb/scylla-operator/pkg/test/unit" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +type subAction struct { + updateState map[string]bool + updates []string + updateFunc func(sts *appsv1.StatefulSet) error +} + +func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { + return a.updateState[sts.Name], nil +} + +func (a *subAction) Update(sts *appsv1.StatefulSet) error { + a.updates = append(a.updates, sts.Name) + if a.updateFunc != nil { + return a.updateFunc(sts) + } + return nil +} + +func (a subAction) Name() string { + return "fake-sub-action" +} + +func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) { + t.Parallel() + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(1) + firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") + + objects := []runtime.Object{firstRack} + kubeClient := fake.NewSimpleClientset(objects...) + + sa := &subAction{ + updateFunc: func(sts *appsv1.StatefulSet) error { + sts.Generation += 1 + return nil + }, + updateState: map[string]bool{ + firstRack.Name: false, + }, + } + a := rackSynchronizedAction{ + subAction: sa, + cluster: cluster, + logger: logger, + } + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + expectedUpdates := []string{firstRack.Name} + if !reflect.DeepEqual(sa.updates, expectedUpdates) { + t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates) + } + + sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("cannot get statefulset: %s", err) + } + + if sts.Generation == firstRack.Generation { + t.Error("Expected sts update") + } +} + +func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) { + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(1, 1) + firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") + firstRackReady := firstRack.DeepCopy() + firstRackReady.Status.ObservedGeneration = firstRackReady.Generation + firstRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[0].Members + secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image") + secondRackReady := secondRack.DeepCopy() + secondRackReady.Status.ObservedGeneration = secondRackReady.Generation + secondRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[1].Members + + ts := []struct { + Name string + Objects []runtime.Object + State map[string]bool + ExpectedUpdates []string + }{ + { + Name: "nothing updated", + Objects: []runtime.Object{firstRack, secondRack}, + State: map[string]bool{ + firstRack.Name: false, + secondRack.Name: false, + }, + ExpectedUpdates: []string{firstRack.Name}, + }, + { + Name: "first rack updated, not ready", + Objects: []runtime.Object{firstRack, secondRack}, + State: map[string]bool{ + firstRack.Name: true, + secondRack.Name: false, + }, + ExpectedUpdates: nil, + }, + { + Name: "first rack updated and ready", + Objects: []runtime.Object{firstRackReady, secondRack}, + State: map[string]bool{ + firstRackReady.Name: true, + secondRack.Name: false, + }, + ExpectedUpdates: []string{secondRack.Name}, + }, + { + Name: "all racks updated", + Objects: []runtime.Object{firstRack, secondRack}, + State: map[string]bool{ + firstRack.Name: true, + secondRack.Name: true, + }, + ExpectedUpdates: nil, + }, + { + Name: "second rack updated, first not", + Objects: []runtime.Object{firstRack, secondRack}, + State: map[string]bool{ + firstRack.Name: false, + secondRack.Name: true, + }, + ExpectedUpdates: []string{firstRack.Name}, + }, + { + Name: "second rack updated and ready, first not", + Objects: []runtime.Object{firstRack, secondRackReady}, + State: map[string]bool{ + firstRack.Name: false, + secondRack.Name: true, + }, + ExpectedUpdates: []string{firstRack.Name}, + }, + } + + for i := range ts { + test := ts[i] + t.Run(test.Name, func(t *testing.T) { + t.Parallel() + + kubeClient := fake.NewSimpleClientset(test.Objects...) + + sa := &subAction{ + updateState: test.State, + } + a := rackSynchronizedAction{ + subAction: sa, + cluster: cluster, + logger: logger, + } + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) { + t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates) + } + }) + } +} diff --git a/pkg/controllers/cluster/actions/sidecar_upgrade.go b/pkg/controllers/cluster/actions/sidecar_upgrade.go new file mode 100644 index 00000000000..80028a32eae --- /dev/null +++ b/pkg/controllers/cluster/actions/sidecar_upgrade.go @@ -0,0 +1,54 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "github.com/pkg/errors" + "github.com/scylladb/go-log" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" + "github.com/scylladb/scylla-operator/pkg/naming" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +const ( + SidecarVersionUpgradeAction = "sidecar-upgrade" +) + +type SidecarUpgrade struct { + sidecar corev1.Container +} + +func (a *SidecarUpgrade) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { + sidecarIdx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers) + if err != nil { + return false, errors.Wrap(err, "find sidecar container in pod") + } + + return sts.Spec.Template.Spec.InitContainers[sidecarIdx].Image == a.sidecar.Image, nil +} + +func (a *SidecarUpgrade) Update(sts *appsv1.StatefulSet) error { + initContainers := sts.Spec.Template.Spec.InitContainers + sidecarIdx, err := naming.FindSidecarInjectorContainer(initContainers) + if err != nil { + return errors.Wrap(err, "find sidecar container in existing sts") + } + + sts.Spec.Template.Spec.InitContainers[sidecarIdx] = a.sidecar + return nil +} + +func NewSidecarUpgrade(c *scyllav1.ScyllaCluster, sidecar corev1.Container, l log.Logger) *rackSynchronizedAction { + return &rackSynchronizedAction{ + subAction: &SidecarUpgrade{ + sidecar: sidecar, + }, + cluster: c, + logger: l, + } +} + +func (a *SidecarUpgrade) Name() string { + return SidecarVersionUpgradeAction +} diff --git a/pkg/controllers/cluster/actions/sidecar_upgrade_test.go b/pkg/controllers/cluster/actions/sidecar_upgrade_test.go new file mode 100644 index 00000000000..305da9a79bc --- /dev/null +++ b/pkg/controllers/cluster/actions/sidecar_upgrade_test.go @@ -0,0 +1,69 @@ +// Copyright (C) 2021 ScyllaDB + +package actions + +import ( + "context" + "reflect" + "testing" + + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" + "github.com/scylladb/scylla-operator/pkg/naming" + "github.com/scylladb/scylla-operator/pkg/test/unit" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestSidecarUpgradeAction(t *testing.T) { + const ( + preUpdateImage = "sidecar:old" + postUpdateImage = "sidecar:new" + ) + var ( + postUpdateCommand = []string{"true"} + ) + + ctx := context.Background() + logger, _ := log.NewProduction(log.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), + }) + + cluster := unit.NewMultiRackCluster(1) + rack := cluster.Spec.Datacenter.Racks[0] + rackSts := resource.StatefulSetForRack(rack, cluster, preUpdateImage) + + updatedContainer := corev1.Container{ + Name: naming.SidecarInjectorContainerName, + Image: postUpdateImage, + Command: postUpdateCommand, + } + + objects := []runtime.Object{rackSts} + kubeClient := fake.NewSimpleClientset(objects...) + + a := NewSidecarUpgrade(cluster, updatedContainer, logger) + s := &State{kubeclient: kubeClient} + + if err := a.Execute(ctx, s); err != nil { + t.Fatal(err) + } + + sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, rackSts.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected non nil err, got %s", err) + } + + idx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers) + if err != nil { + t.Fatalf("Expected non nil err, got %s", err) + } + + if !reflect.DeepEqual(sts.Spec.Template.Spec.InitContainers[idx], updatedContainer) { + t.Fatalf("Expected containers to be equal, got %+v, expected %+v", sts.Spec.Template.Spec.InitContainers[idx], updatedContainer) + } +} diff --git a/pkg/controllers/cluster/actions/upgrade_version.go b/pkg/controllers/cluster/actions/upgrade_version.go index 2c005bbd406..7f648001b05 100644 --- a/pkg/controllers/cluster/actions/upgrade_version.go +++ b/pkg/controllers/cluster/actions/upgrade_version.go @@ -136,6 +136,7 @@ func (a *ClusterVersionUpgrade) upgradeProcedure(ctx context.Context) upgradePro return genericUpgradeProcedure } +// TODO:(zimnx) Refactor patch version upgrade into rack_synchronized action. func (a *ClusterVersionUpgrade) Execute(ctx context.Context, s *State) error { a.cc = s.Client a.kubeClient = s.kubeclient @@ -193,7 +194,7 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error { return errors.Wrap(err, "get statefulset") } - scyllaVersion, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers) + scyllaVersion, err := naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return errors.Wrap(err, "get scylla container version") } @@ -207,7 +208,7 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error { } for _, p := range pods.Items { - scyllaVersion, err := naming.ScyllaImage(p.Spec.Containers) + scyllaVersion, err := naming.ScyllaVersion(p.Spec.Containers) if err != nil { return errors.Wrap(err, "get scylla container version") } @@ -228,7 +229,7 @@ func (a *ClusterVersionUpgrade) patchUpgrade(ctx context.Context) error { return errors.Wrap(err, "get statefulset") } - scyllaVersion, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers) + scyllaVersion, err := naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return errors.Wrap(err, "get scylla container version") } @@ -622,7 +623,7 @@ func (a *ClusterVersionUpgrade) nextRack(ctx context.Context) (*scyllav1.RackSpe } for _, p := range pods.Items { - containerVersion, err := naming.ScyllaImage(p.Spec.Containers) + containerVersion, err := naming.ScyllaVersion(p.Spec.Containers) if err != nil { return nil, errors.Wrap(err, "get scylla container version") } @@ -715,7 +716,7 @@ func (a *ClusterVersionUpgrade) nextNode(ctx context.Context) (*corev1.Pod, erro }) for _, p := range pods.Items { - containerVersion, err := naming.ScyllaImage(p.Spec.Containers) + containerVersion, err := naming.ScyllaVersion(p.Spec.Containers) if err != nil { return nil, errors.Wrap(err, "get scylla container image version") } @@ -794,7 +795,7 @@ func (a *ClusterVersionUpgrade) nodeUpgradedConditionFunc(ctx context.Context) f return false, errors.Wrap(err, "refresh pod") } - ver, err := naming.ScyllaImage(node.Spec.Containers) + ver, err := naming.ScyllaVersion(node.Spec.Containers) if err != nil { return false, errors.Wrap(err, "get scylla container image") } diff --git a/pkg/controllers/cluster/resource/resource.go b/pkg/controllers/cluster/resource/resource.go index 75c6205d8d1..50f73b93691 100644 --- a/pkg/controllers/cluster/resource/resource.go +++ b/pkg/controllers/cluster/resource/resource.go @@ -190,7 +190,7 @@ func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, sidecarI Tolerations: placement.Tolerations, InitContainers: []corev1.Container{ { - Name: "sidecar-injection", + Name: naming.SidecarInjectorContainerName, Image: sidecarImage, ImagePullPolicy: "IfNotPresent", Command: []string{ diff --git a/pkg/controllers/cluster/status.go b/pkg/controllers/cluster/status.go index 42d84f4ee52..cf06e0d38c2 100644 --- a/pkg/controllers/cluster/status.go +++ b/pkg/controllers/cluster/status.go @@ -81,25 +81,14 @@ func (cc *ClusterReconciler) updateStatus(ctx context.Context, cluster *scyllav1 if err != nil { return errors.WithStack(err) } - idx, err := naming.FindScyllaContainer(sts.Spec.Template.Spec.Containers) - if err != nil { - return errors.WithStack(err) - } - rackStatus.Version, err = naming.ImageToVersion(sts.Spec.Template.Spec.Containers[idx].Image) + rackStatus.Version, err = naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return errors.WithStack(err) } } // Update Upgrading condition - idx, err := naming.FindScyllaContainer(sts.Spec.Template.Spec.Containers) - if err != nil { - return errors.WithStack(err) - } - desiredRackVersion, err := naming.ImageToVersion(sts.Spec.Template.Spec.Containers[idx].Image) - if err != nil { - return errors.WithStack(err) - } + desiredRackVersion := cluster.Spec.Version actualRackVersion := rackStatus.Version if desiredRackVersion != actualRackVersion { cc.Logger.Info(ctx, "Rack should be upgraded", "actual_version", actualRackVersion, "desired_version", desiredRackVersion, "rack", rack.Name) diff --git a/pkg/controllers/cluster/sync.go b/pkg/controllers/cluster/sync.go index f4969c2e34d..4a205dad306 100644 --- a/pkg/controllers/cluster/sync.go +++ b/pkg/controllers/cluster/sync.go @@ -8,6 +8,7 @@ import ( "github.com/scylladb/go-log" scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/actions" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" "github.com/scylladb/scylla-operator/pkg/naming" corev1 "k8s.io/api/core/v1" @@ -66,21 +67,22 @@ func (cc *ClusterReconciler) sync(c *scyllav1.ScyllaCluster) error { } // Calculate and execute next action - if act := cc.nextAction(ctx, c); act != nil { + if act, err := cc.nextAction(ctx, c); err != nil { + cc.Recorder.Event(c, corev1.EventTypeWarning, naming.ErrSyncFailed, fmt.Sprintf(MessageUpdateStatusFailed, err)) + return errors.Wrap(err, "failed to determine next action") + } else if act != nil { s := actions.NewState(cc.Client, cc.KubeClient, cc.Recorder) logger.Debug(ctx, "New action", "name", act.Name()) - err = act.Execute(ctx, s) - } - - if err != nil { - cc.Recorder.Event(c, corev1.EventTypeWarning, naming.ErrSyncFailed, fmt.Sprintf(MessageClusterSyncFailed, errors.Cause(err))) - return err + if err := act.Execute(ctx, s); err != nil { + cc.Recorder.Event(c, corev1.EventTypeWarning, naming.ErrSyncFailed, fmt.Sprintf(MessageClusterSyncFailed, errors.Cause(err))) + return err + } } return nil } -func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.ScyllaCluster) actions.Action { +func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.ScyllaCluster) (actions.Action, error) { logger := cc.Logger.With("cluster", cluster.Namespace+"/"+cluster.Name, "resourceVersion", cluster.ResourceVersion) // Check if any rack isn't created @@ -88,7 +90,7 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S // For each rack, check if a status entry exists if _, ok := cluster.Status.Racks[rack.Name]; !ok { logger.Info(ctx, "Next Action: Create rack", "name", rack.Name) - return actions.NewRackCreateAction(rack, cluster, cc.OperatorImage) + return actions.NewRackCreateAction(rack, cluster, cc.OperatorImage), nil } } @@ -102,13 +104,13 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S if scyllav1.IsRackConditionTrue(&rackStatus, scyllav1.RackConditionTypeMemberReplacing) { // Perform node replace logger.Info(ctx, "Next Action: Node replace rack", "name", rack.Name) - return actions.NewRackReplaceNodeAction(rack, cluster, logger.Named("replace")) + return actions.NewRackReplaceNodeAction(rack, cluster, logger.Named("replace")), nil } if scyllav1.IsRackConditionTrue(&rackStatus, scyllav1.RackConditionTypeMemberLeaving) { // Resume scale down logger.Info(ctx, "Next Action: Scale-Down rack", "name", rack.Name) - return actions.NewRackScaleDownAction(rack, cluster) + return actions.NewRackScaleDownAction(rack, cluster), nil } } @@ -117,7 +119,18 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S rackStatus := cluster.Status.Racks[rack.Name] if cluster.Status.Upgrade != nil || scyllav1.IsRackConditionTrue(&rackStatus, scyllav1.RackConditionTypeUpgrading) { - return actions.NewClusterVersionUpgradeAction(cluster, logger) + return actions.NewClusterVersionUpgradeAction(cluster, logger), nil + } + } + + // Check if there is an sidecar upgrade in progress + for _, rack := range cluster.Spec.Datacenter.Racks { + update, container, err := cc.sidecarUpdateNeeded(ctx, rack, cluster) + if err != nil { + return nil, errors.Wrap(err, "check if sidecar update is needed") + } + if update { + return actions.NewSidecarUpgrade(cluster, container, logger), nil } } @@ -129,7 +142,7 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S for _, rack := range cluster.Spec.Datacenter.Racks { if rack.Members < cluster.Status.Racks[rack.Name].Members { logger.Info(ctx, "Next Action: Scale-Down rack", "name", rack.Name) - return actions.NewRackScaleDownAction(rack, cluster) + return actions.NewRackScaleDownAction(rack, cluster), nil } } @@ -137,7 +150,7 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S for _, rack := range cluster.Spec.Datacenter.Racks { if rack.Members > cluster.Status.Racks[rack.Name].Members { logger.Info(ctx, "Next Action: Scale-Up rack", "name", rack.Name) - return actions.NewRackScaleUpAction(rack, cluster) + return actions.NewRackScaleUpAction(rack, cluster), nil } } @@ -145,10 +158,36 @@ func (cc *ClusterReconciler) nextAction(ctx context.Context, cluster *scyllav1.S for _, rack := range cluster.Spec.Datacenter.Racks { if cluster.Spec.Version != cluster.Status.Racks[rack.Name].Version { logger.Info(ctx, "Next Action: Upgrade rack", "name", rack.Name) - return actions.NewClusterVersionUpgradeAction(cluster, logger) + return actions.NewClusterVersionUpgradeAction(cluster, logger), nil } } // Nothing to do - return nil + return nil, nil +} + +func (cc *ClusterReconciler) sidecarUpdateNeeded(ctx context.Context, rack scyllav1.RackSpec, cluster *scyllav1.ScyllaCluster) (bool, corev1.Container, error) { + desiredSts := resource.StatefulSetForRack(rack, cluster, cc.OperatorImage) + desiredIdx, err := naming.FindSidecarInjectorContainer(desiredSts.Spec.Template.Spec.InitContainers) + if err != nil { + return false, corev1.Container{}, errors.Wrap(err, "find sidecar container in desired sts") + } + + actualSts, err := util.GetStatefulSetForRack(ctx, rack, cluster, cc.KubeClient) + if err != nil { + return false, corev1.Container{}, errors.Wrap(err, "fetch rack sts") + } + + actualIdx, err := naming.FindSidecarInjectorContainer(actualSts.Spec.Template.Spec.InitContainers) + if err != nil { + return false, corev1.Container{}, errors.Wrap(err, "find sidecar container in actual sts") + } + + actualSidecarContainer := actualSts.Spec.Template.Spec.InitContainers[actualIdx] + desiredSidecarContainer := desiredSts.Spec.Template.Spec.InitContainers[desiredIdx] + + cc.Logger.Debug(ctx, "Sidecar update", "rack", rack.Name, "desired_image", desiredSidecarContainer.Image, "actual_image", actualSidecarContainer.Image) + + return actualSidecarContainer.Image != desiredSidecarContainer.Image, desiredSidecarContainer, nil + } diff --git a/pkg/controllers/cluster/sync_test.go b/pkg/controllers/cluster/sync_test.go index 9820d5f57a9..d458b45f1ab 100644 --- a/pkg/controllers/cluster/sync_test.go +++ b/pkg/controllers/cluster/sync_test.go @@ -2,19 +2,28 @@ package cluster import ( "context" + "fmt" "testing" "github.com/blang/semver" - scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/actions" + "github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" + "github.com/scylladb/scylla-operator/pkg/naming" "github.com/scylladb/scylla-operator/pkg/test/unit" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" ) func TestNextAction(t *testing.T) { + const ( + operatorImage = "scylladb/scylla-operator:latest" + ) + members := int32(3) cluster := unit.NewSingleRackCluster(members) + rack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, operatorImage) clusterNewRackCreate := cluster.DeepCopy() clusterNewRackCreate.Spec.Datacenter.Racks = append( @@ -52,51 +61,78 @@ func TestNextAction(t *testing.T) { scyllav1.SetRackCondition(&testRackStatus, scyllav1.RackConditionTypeUpgrading) clusterResumeVersionUpgrade.Status.Racks["test-rack"] = testRackStatus + clusterSidecarUpgrade := cluster.DeepCopy() + rackWithOldSidecarImage := rack.DeepCopy() + idx, err := naming.FindSidecarInjectorContainer(rackWithOldSidecarImage.Spec.Template.Spec.InitContainers) + if err != nil { + t.Fatal(err) + } + rackWithOldSidecarImage.Spec.Template.Spec.InitContainers[idx].Image = "scylladb/scylla-operator:old" + tests := []struct { name string cluster *scyllav1.ScyllaCluster + objects []runtime.Object expectedAction string expectNoAction bool }{ { name: "create rack", cluster: clusterNewRackCreate, + objects: []runtime.Object{rack}, expectedAction: actions.RackCreateAction, }, { name: "scale up existing rack", cluster: clusterExistingRackScaleUp, + objects: []runtime.Object{rack}, expectedAction: actions.RackScaleUpAction, }, { name: "scale down begin", cluster: clusterBeginRackScaleDown, + objects: []runtime.Object{rack}, expectedAction: actions.RackScaleDownAction, }, { name: "scale down resume", cluster: clusterResumeRackScaleDown, + objects: []runtime.Object{rack}, expectedAction: actions.RackScaleDownAction, }, { name: "patch upgrade begin", cluster: clusterBeginVersionUpgrade, + objects: []runtime.Object{rack}, expectedAction: actions.ClusterVersionUpgradeAction, }, { name: "patch upgrade in-progress", cluster: clusterResumeVersionUpgrade, + objects: []runtime.Object{rack}, expectedAction: actions.ClusterVersionUpgradeAction, }, + { + name: "sidecar upgrade", + cluster: clusterSidecarUpgrade, + objects: []runtime.Object{rackWithOldSidecarImage}, + expectedAction: fmt.Sprintf("%s%s", actions.RackSynchronizedActionPrefix, actions.SidecarVersionUpgradeAction), + }, } - cc := &ClusterReconciler{} - for _, test := range tests { t.Run(test.name, func(t *testing.T) { + cc := &ClusterReconciler{ + KubeClient: fake.NewSimpleClientset(test.objects...), + OperatorImage: operatorImage, + } + // Calculate next action - a := cc.nextAction(context.Background(), test.cluster) + a, err := cc.nextAction(context.Background(), test.cluster) + if err != nil { + t.Errorf("expected non nil err, got %s", err) + } if a == nil { if test.expectNoAction { return diff --git a/pkg/controllers/cluster/util/util.go b/pkg/controllers/cluster/util/util.go index 9ebb787148e..c005c0b4357 100644 --- a/pkg/controllers/cluster/util/util.go +++ b/pkg/controllers/cluster/util/util.go @@ -59,6 +59,16 @@ func GetMemberServicesForRack(ctx context.Context, r scyllav1.RackSpec, c *scyll return svcList.Items, nil } +// GetStatefulSetForRack returns rack underlying StatefulSet. +func GetStatefulSetForRack(ctx context.Context, rack scyllav1.RackSpec, cluster *scyllav1.ScyllaCluster, kubeClient kubernetes.Interface) (*appsv1.StatefulSet, error) { + stsName := naming.StatefulSetNameForRack(rack, cluster) + sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return sts, nil +} + // RefFromString is a helper function that takes a string // and outputs a reference to that string. // Useful for initializing a string pointer from a literal. diff --git a/pkg/naming/constants.go b/pkg/naming/constants.go index 60184b1cf25..60baf06f85f 100644 --- a/pkg/naming/constants.go +++ b/pkg/naming/constants.go @@ -64,7 +64,8 @@ const ( // Configuration Values const ( - ScyllaContainerName = "scylla" + ScyllaContainerName = "scylla" + SidecarInjectorContainerName = "sidecar-injection" PVCTemplateName = "data" diff --git a/pkg/naming/names.go b/pkg/naming/names.go index 71cd99a0775..dcffd08befa 100644 --- a/pkg/naming/names.go +++ b/pkg/naming/names.go @@ -81,6 +81,7 @@ func IndexFromName(n string) (int32, error) { return int32(index), nil } +// ImageToVersion strips version part from container image. func ImageToVersion(image string) (string, error) { parts := strings.Split(image, ":") if len(parts) != 2 || len(parts[1]) == 0 { @@ -89,17 +90,28 @@ func ImageToVersion(image string) (string, error) { return parts[1], nil } +// FindScyllaContainer returns Scylla container from given list. func FindScyllaContainer(containers []corev1.Container) (int, error) { + return FindContainerWithName(containers, ScyllaContainerName) +} + +// FindSidecarInjectorContainer returns sidecar injector container from given list. +func FindSidecarInjectorContainer(containers []corev1.Container) (int, error) { + return FindContainerWithName(containers, SidecarInjectorContainerName) +} + +// FindContainerWithName returns container having +func FindContainerWithName(containers []corev1.Container, name string) (int, error) { for idx := range containers { - if containers[idx].Name == ScyllaContainerName { + if containers[idx].Name == name { return idx, nil } } - return -1, errors.New(fmt.Sprintf("Scylla Container '%s' not found", ScyllaContainerName)) + return 0, errors.Errorf(" '%s' container not found", name) } -// ScyllaImage returns version of Scylla container. -func ScyllaImage(containers []corev1.Container) (string, error) { +// ScyllaVersion returns version of Scylla container. +func ScyllaVersion(containers []corev1.Container) (string, error) { idx, err := FindScyllaContainer(containers) if err != nil { return "", errors.Wrap(err, "find scylla container") @@ -111,3 +123,17 @@ func ScyllaImage(containers []corev1.Container) (string, error) { } return version, nil } + +// SidecarVersion returns version of sidecar container. +func SidecarVersion(containers []corev1.Container) (string, error) { + idx, err := FindSidecarInjectorContainer(containers) + if err != nil { + return "", errors.Wrap(err, "find sidecar container") + } + + version, err := ImageToVersion(containers[idx].Image) + if err != nil { + return "", errors.Wrap(err, "parse sidecar container version") + } + return version, nil +} diff --git a/pkg/test/unit/helpers.go b/pkg/test/unit/helpers.go index f270742c5dd..a2b38d23491 100644 --- a/pkg/test/unit/helpers.go +++ b/pkg/test/unit/helpers.go @@ -1,15 +1,24 @@ package unit import ( + "fmt" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" "github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// NewSingleRackCluster returns ScyllaCluster having single racks. func NewSingleRackCluster(members int32) *scyllav1.ScyllaCluster { return NewDetailedSingleRackCluster("test-cluster", "test-ns", "repo", "2.3.1", "test-dc", "test-rack", members) } +// NewMultiRackCluster returns ScyllaCluster having multiple racks. +func NewMultiRackCluster(members ...int32) *scyllav1.ScyllaCluster { + return NewDetailedMultiRackCluster("test-cluster", "test-ns", "repo", "2.3.1", "test-dc", members...) +} + +// NewDetailedSingleRackCluster returns ScyllaCluster having single rack with supplied information. func NewDetailedSingleRackCluster(name, namespace, repo, version, dc, rack string, members int32) *scyllav1.ScyllaCluster { return &scyllav1.ScyllaCluster{ ObjectMeta: metav1.ObjectMeta{ @@ -43,3 +52,42 @@ func NewDetailedSingleRackCluster(name, namespace, repo, version, dc, rack strin }, } } + +// NewDetailedMultiRackCluster creates multi rack cluster with supplied information. +func NewDetailedMultiRackCluster(name, namespace, repo, version, dc string, members ...int32) *scyllav1.ScyllaCluster { + c := &scyllav1.ScyllaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: scyllav1.ClusterSpec{ + Repository: util.RefFromString(repo), + Version: version, + Datacenter: scyllav1.DatacenterSpec{ + Name: dc, + Racks: []scyllav1.RackSpec{}, + }, + }, + Status: scyllav1.ClusterStatus{ + Racks: map[string]scyllav1.RackStatus{}, + }, + } + + for i, m := range members { + rack := fmt.Sprintf("rack-%d", i) + c.Spec.Datacenter.Racks = append(c.Spec.Datacenter.Racks, scyllav1.RackSpec{ + Name: rack, + Members: m, + Storage: scyllav1.StorageSpec{ + Capacity: "5Gi", + }, + }) + c.Status.Racks[rack] = scyllav1.RackStatus{ + Version: version, + Members: m, + ReadyMembers: m, + } + } + + return c +} diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 3e8fbff83fb..b1d436b6252 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -63,7 +63,7 @@ func TestMain(m *testing.M) { } }() - options.GetOperatorOptions().Image = "scylladb/scylla-operator" + options.GetOperatorOptions().Image = "scylladb/scylla-operator:latest" defer func() { options.GetOperatorOptions().Image = "" }() diff --git a/test/integration/upgrade_version_test.go b/test/integration/upgrade_version_test.go index 9bd636a8e24..16273737aff 100644 --- a/test/integration/upgrade_version_test.go +++ b/test/integration/upgrade_version_test.go @@ -300,7 +300,7 @@ func scyllaImageInRackStatefulSet(ctx context.Context, rack scyllav1.RackSpec, c return "", err } - ver, err := naming.ScyllaImage(sts.Spec.Template.Spec.Containers) + ver, err := naming.ScyllaVersion(sts.Spec.Template.Spec.Containers) if err != nil { return "", err }