Skip to content

Commit

Permalink
cluster: automated sidecar upgrade (scylladb#187)
Browse files Browse the repository at this point in the history
Operator checks whether version of sidecar in each rack StatefulSEt
is up to date. If it's different than Operator version, sidecar container
is updated.

Fixes scylladb#187
  • Loading branch information
zimnx committed Feb 23, 2021
1 parent bc194d8 commit df708f3
Show file tree
Hide file tree
Showing 15 changed files with 602 additions and 47 deletions.
83 changes: 83 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
appsv1 "k8s.io/api/apps/v1"
)

type rackSynchronizedSubAction interface {
// RackUpdated should return whether requested update is already applied to a rack sts.
RackUpdated(sts *appsv1.StatefulSet) (bool, error)
// Update performs desired update.
Update(sts *appsv1.StatefulSet) error
// Name of action.
Name() string
}

// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner.
// Racks are upgraded in the same order as in ScyllaCluster Spec.
// Single Execute call performs at most a single update.
type rackSynchronizedAction struct {
subAction rackSynchronizedSubAction
cluster *scyllav1.ScyllaCluster
logger log.Logger
}

const (
RackSynchronizedActionPrefix = "rack-synchronized-"
)

func (a rackSynchronizedAction) Name() string {
return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name())
}

func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error {
for _, rack := range a.cluster.Spec.Datacenter.Racks {
sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient)
if err != nil {
return errors.Wrap(err, "get rack statefulset")
}

rackUpdated, err := a.subAction.RackUpdated(sts)
if err != nil {
return errors.Wrap(err, "determine if rack needs update")
}
if !rackUpdated {
upgradedSts := sts.DeepCopy()
if err := a.subAction.Update(upgradedSts); err != nil {
return errors.Wrap(err, "update rack")
}

// Patch rack sts and exit
a.logger.Info(ctx, "Patching rack definition", "rack", rack.Name)
if err := util.PatchStatefulSet(ctx, sts, upgradedSts, s.kubeclient); err != nil {
return errors.Wrap(err, "patch statefulset")
}

return nil
}

if !a.stsReady(rack, sts) {
a.logger.Info(ctx, "Rack still upgrading, awaiting readiness", "rack", rack.Name)
return nil
}

a.logger.Info(ctx, "Rack updated", "rack", rack.Name)
}

return nil
}

func (a rackSynchronizedAction) stsReady(rack scyllav1.RackSpec, sts *appsv1.StatefulSet) bool {
return sts.Generation == sts.Status.ObservedGeneration &&
sts.Status.ReadyReplicas == rack.Members &&
sts.Status.UpdateRevision == sts.Status.CurrentRevision
}
193 changes: 193 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"reflect"
"testing"

"github.com/scylladb/go-log"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource"
"github.com/scylladb/scylla-operator/pkg/test/unit"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

type subAction struct {
updateState map[string]bool
updates []string
updateFunc func(sts *appsv1.StatefulSet) error
}

func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) {
return a.updateState[sts.Name], nil
}

func (a *subAction) Update(sts *appsv1.StatefulSet) error {
a.updates = append(a.updates, sts.Name)
if a.updateFunc != nil {
return a.updateFunc(sts)
}
return nil
}

func (a subAction) Name() string {
return "fake-sub-action"
}

func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")

objects := []runtime.Object{firstRack}
kubeClient := fake.NewSimpleClientset(objects...)

sa := &subAction{
updateFunc: func(sts *appsv1.StatefulSet) error {
sts.Generation += 1
return nil
},
updateState: map[string]bool{
firstRack.Name: false,
},
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
expectedUpdates := []string{firstRack.Name}
if !reflect.DeepEqual(sa.updates, expectedUpdates) {
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates)
}

sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Expected non nil err, got %s", err)
}

if sts.Generation == firstRack.Generation {
t.Error("Expected sts update")
}
}

func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) {
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1, 1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")
firstRackReady := firstRack.DeepCopy()
firstRackReady.Status.ObservedGeneration = firstRackReady.Generation
firstRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[0].Members
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image")
secondRackReady := secondRack.DeepCopy()
secondRackReady.Status.ObservedGeneration = secondRackReady.Generation
secondRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[1].Members

ts := []struct {
Name string
Objects []runtime.Object
State map[string]bool
ExpectedUpdates []string
}{
{
Name: "nothing updated",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: false,
},
ExpectedUpdates: []string{firstRack.Name},
},
{
Name: "first rack updated, not ready",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: false,
},
ExpectedUpdates: nil,
},
{
Name: "first rack updated and ready",
Objects: []runtime.Object{firstRackReady, secondRack},
State: map[string]bool{
firstRackReady.Name: true,
secondRack.Name: false,
},
ExpectedUpdates: []string{secondRack.Name},
},
{
Name: "all racks updated",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: true,
},
ExpectedUpdates: nil,
},
{
Name: "second rack updated, first not",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: true,
},
ExpectedUpdates: []string{firstRack.Name},
},
{
Name: "second rack updated and ready, first not",
Objects: []runtime.Object{firstRack, secondRackReady},
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: true,
},
ExpectedUpdates: []string{firstRack.Name},
},
}

for i := range ts {
test := ts[i]
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

kubeClient := fake.NewSimpleClientset(test.Objects...)

sa := &subAction{
updateState: test.State,
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) {
t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates)
}
})
}
}
54 changes: 54 additions & 0 deletions pkg/controllers/cluster/actions/sidecar_upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1"
"github.com/scylladb/scylla-operator/pkg/naming"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

const (
SidecarVersionUpgradeAction = "sidecar-upgrade"
)

type SidecarUpgrade struct {
sidecar corev1.Container
}

func (a *SidecarUpgrade) RackUpdated(sts *appsv1.StatefulSet) (bool, error) {
sidecarIdx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers)
if err != nil {
return false, errors.Wrap(err, "find sidecar container in pod")
}

return sts.Spec.Template.Spec.InitContainers[sidecarIdx].Image == a.sidecar.Image, nil
}

func (a *SidecarUpgrade) Update(sts *appsv1.StatefulSet) error {
initContainers := sts.Spec.Template.Spec.InitContainers
sidecarIdx, err := naming.FindSidecarInjectorContainer(initContainers)
if err != nil {
return errors.Wrap(err, "find sidecar container in existing sts")
}

sts.Spec.Template.Spec.InitContainers[sidecarIdx] = a.sidecar
return nil
}

func NewSidecarUpgrade(c *scyllav1.ScyllaCluster, sidecar corev1.Container, l log.Logger) *rackSynchronizedAction {
return &rackSynchronizedAction{
subAction: &SidecarUpgrade{
sidecar: sidecar,
},
cluster: c,
logger: l,
}
}

func (a *SidecarUpgrade) Name() string {
return SidecarVersionUpgradeAction
}
Loading

0 comments on commit df708f3

Please sign in to comment.