Skip to content

Commit

Permalink
cluster: automated sidecar upgrade (scylladb#187)
Browse files Browse the repository at this point in the history
Operator sets actual sidecar version in ScyllaCluster status.
If it's different than Operator version, sidecar container
is updated.

Fixes scylladb#187
  • Loading branch information
zimnx committed Feb 22, 2021
1 parent bc194d8 commit 77d825c
Show file tree
Hide file tree
Showing 15 changed files with 596 additions and 47 deletions.
77 changes: 77 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
appsv1 "k8s.io/api/apps/v1"
)

type rackSynchronizedSubAction interface {
// RackUpdated should return whether requested update is already applied to a rack sts.
RackUpdated(sts *appsv1.StatefulSet) (bool, error)
// Update performs desired update.
Update(sts *appsv1.StatefulSet) error
// Name of action.
Name() string
}

// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner.
// Racks are upgraded in the same order as in ScyllaCluster Spec.
// Single Execute call performs at most a single update.
type rackSynchronizedAction struct {
subAction rackSynchronizedSubAction
cluster *scyllav1.ScyllaCluster
logger log.Logger
}

const (
RackSynchronizedActionPrefix = "rack-synchronized-"
)

func (a rackSynchronizedAction) Name() string {
return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name())
}

func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error {
for _, rack := range a.cluster.Spec.Datacenter.Racks {
sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient)
if err != nil {
return errors.Wrap(err, "get rack statefulset")
}

rackUpdated, err := a.subAction.RackUpdated(sts)
if err != nil {
return errors.Wrap(err, "determine if rack needs update")
}
if !rackUpdated {
upgradedSts := sts.DeepCopy()
if err := a.subAction.Update(upgradedSts); err != nil {
return errors.Wrap(err, "update rack")
}

// Patch rack sts and exit
a.logger.Info(ctx, "Patching rack definition", "rack", rack.Name)
if err := util.PatchStatefulSet(ctx, sts, upgradedSts, s.kubeclient); err != nil {
return errors.Wrap(err, "patch statefulset")
}

return nil
}

if sts.Generation != sts.Status.ObservedGeneration || sts.Status.ReadyReplicas != rack.Members {
a.logger.Info(ctx, "Rack still upgrading, awaiting readiness", "rack", rack.Name)
return nil
}

a.logger.Info(ctx, "Rack updated", "rack", rack.Name)
}

return nil
}
193 changes: 193 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"reflect"
"testing"

"github.com/scylladb/go-log"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource"
"github.com/scylladb/scylla-operator/pkg/test/unit"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

type subAction struct {
updateState map[string]bool
updates []string
updateFunc func(sts *appsv1.StatefulSet) error
}

func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) {
return a.updateState[sts.Name], nil
}

func (a *subAction) Update(sts *appsv1.StatefulSet) error {
a.updates = append(a.updates, sts.Name)
if a.updateFunc != nil {
return a.updateFunc(sts)
}
return nil
}

func (a subAction) Name() string {
return "fake-sub-action"
}

func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")

objects := []runtime.Object{firstRack}
kubeClient := fake.NewSimpleClientset(objects...)

sa := &subAction{
updateFunc: func(sts *appsv1.StatefulSet) error {
sts.Generation += 1
return nil
},
updateState: map[string]bool{
firstRack.Name: false,
},
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
expectedUpdates := []string{firstRack.Name}
if !reflect.DeepEqual(sa.updates, expectedUpdates) {
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates)
}

sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Expected non nil err, got %s", err)
}

if sts.Generation == firstRack.Generation {
t.Error("Expected sts update")
}
}

func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) {
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1, 1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")
firstRackReady := firstRack.DeepCopy()
firstRackReady.Status.ObservedGeneration = firstRackReady.Generation
firstRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[0].Members
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image")
secondRackReady := secondRack.DeepCopy()
secondRackReady.Status.ObservedGeneration = secondRackReady.Generation
secondRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[1].Members

ts := []struct {
Name string
Objects []runtime.Object
State map[string]bool
ExpectedUpdates []string
}{
{
Name: "nothing updated",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: false,
},
ExpectedUpdates: []string{firstRack.Name},
},
{
Name: "first rack updated, not ready",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: false,
},
ExpectedUpdates: nil,
},
{
Name: "first rack updated and ready",
Objects: []runtime.Object{firstRackReady, secondRack},
State: map[string]bool{
firstRackReady.Name: true,
secondRack.Name: false,
},
ExpectedUpdates: []string{secondRack.Name},
},
{
Name: "all racks updated",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: true,
},
ExpectedUpdates: nil,
},
{
Name: "second rack updated, first not",
Objects: []runtime.Object{firstRack, secondRack},
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: true,
},
ExpectedUpdates: []string{firstRack.Name},
},
{
Name: "second rack updated and ready, first not",
Objects: []runtime.Object{firstRack, secondRackReady},
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: true,
},
ExpectedUpdates: []string{firstRack.Name},
},
}

for i := range ts {
test := ts[i]
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

kubeClient := fake.NewSimpleClientset(test.Objects...)

sa := &subAction{
updateState: test.State,
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) {
t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates)
}
})
}
}
54 changes: 54 additions & 0 deletions pkg/controllers/cluster/actions/sidecar_upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1"
"github.com/scylladb/scylla-operator/pkg/naming"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

const (
SidecarVersionUpgradeAction = "sidecar-upgrade"
)

type SidecarUpgrade struct {
sidecar corev1.Container
}

func (a *SidecarUpgrade) RackUpdated(sts *appsv1.StatefulSet) (bool, error) {
sidecarIdx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers)
if err != nil {
return false, errors.Wrap(err, "find sidecar container in pod")
}

return sts.Spec.Template.Spec.InitContainers[sidecarIdx].Image == a.sidecar.Image, nil
}

func (a *SidecarUpgrade) Update(sts *appsv1.StatefulSet) error {
initContainers := sts.Spec.Template.Spec.InitContainers
sidecarIdx, err := naming.FindSidecarInjectorContainer(initContainers)
if err != nil {
return errors.Wrap(err, "find sidecar container in existing sts")
}

sts.Spec.Template.Spec.InitContainers[sidecarIdx] = a.sidecar
return nil
}

func NewSidecarUpgrade(c *scyllav1.ScyllaCluster, sidecar corev1.Container, l log.Logger) *rackSynchronizedAction {
return &rackSynchronizedAction{
subAction: &SidecarUpgrade{
sidecar: sidecar,
},
cluster: c,
logger: l,
}
}

func (a *SidecarUpgrade) Name() string {
return SidecarVersionUpgradeAction
}
69 changes: 69 additions & 0 deletions pkg/controllers/cluster/actions/sidecar_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"reflect"
"testing"

"github.com/scylladb/go-log"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/test/unit"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

func TestSidecarUpgradeAction(t *testing.T) {
const (
preUpdateImage = "sidecar:old"
postUpdateImage = "sidecar:new"
)
var (
postUpdateCommand = []string{"true"}
)

ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1)
rack := cluster.Spec.Datacenter.Racks[0]
rackSts := resource.StatefulSetForRack(rack, cluster, preUpdateImage)

updatedContainer := corev1.Container{
Name: naming.SidecarInjectorContainerName,
Image: postUpdateImage,
Command: postUpdateCommand,
}

objects := []runtime.Object{rackSts}
kubeClient := fake.NewSimpleClientset(objects...)

a := NewSidecarUpgrade(cluster, updatedContainer, logger)
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}

sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, rackSts.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Expected non nil err, got %s", err)
}

idx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers)
if err != nil {
t.Fatalf("Expected non nil err, got %s", err)
}

if !reflect.DeepEqual(sts.Spec.Template.Spec.InitContainers[idx], updatedContainer) {
t.Fatalf("Expected containers to be equal, got %+v, expected %+v", sts.Spec.Template.Spec.InitContainers[idx], updatedContainer)
}
}
Loading

0 comments on commit 77d825c

Please sign in to comment.