Skip to content

Commit

Permalink
cluster: automated sidecar upgrade (scylladb#187)
Browse files Browse the repository at this point in the history
Operator sets actual sidecar version in ScyllaCluster status.
If it's different than Operator version, sidecar container
is updated.

Fixes scylladb#187
  • Loading branch information
zimnx committed Feb 22, 2021
1 parent bc194d8 commit 1b7266a
Show file tree
Hide file tree
Showing 15 changed files with 721 additions and 47 deletions.
97 changes: 97 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
"github.com/scylladb/scylla-operator/pkg/naming"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type rackSynchronizedSubAction interface {
// RackUpdated should return whether requested update is already applied to a rack sts.
RackUpdated(sts *appsv1.StatefulSet) (bool, error)
// PodUpdated should return whether requested update is already applied to rack pod.
PodUpdated(pod *corev1.Pod) (bool, error)
// Update performs desired update.
Update(sts *appsv1.StatefulSet) error
// Name of action.
Name() string
}

// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner.
// Racks are upgraded in the same order as in ScyllaCluster Spec.
// Single Execute call performs at most a single update.
type rackSynchronizedAction struct {
subAction rackSynchronizedSubAction
cluster *scyllav1.ScyllaCluster
logger log.Logger
}

const (
RackSynchronizedActionPrefix = "rack-synchronized-"
)

func (a rackSynchronizedAction) Name() string {
return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name())
}

func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error {
for _, rack := range a.cluster.Spec.Datacenter.Racks {
sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient)
if err != nil {
return errors.Wrap(err, "get rack statefulset")
}

rackUpdated, err := a.subAction.RackUpdated(sts)
if err != nil {
return errors.Wrap(err, "determine if rack needs update")
}
if !rackUpdated {
upgradedSts := sts.DeepCopy()
if err := a.subAction.Update(upgradedSts); err != nil {
return errors.Wrap(err, "update rack")
}

// Patch rack sts and exit
a.logger.Info(ctx, "Patching rack definition", "rack", rack.Name)
if err := util.PatchStatefulSet(ctx, sts, upgradedSts, s.kubeclient); err != nil {
return errors.Wrap(err, "patch statefulset")
}

return nil
}

a.logger.Info(ctx, "Rack already upgraded, checking Pods", "rack", rack.Name)

pods, err := s.kubeclient.CoreV1().Pods(a.cluster.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: naming.RackSelector(rack, a.cluster).String(),
})
if err != nil {
return errors.Wrap(err, "get pods")
}

for _, pod := range pods.Items {
podUpdated, err := a.subAction.PodUpdated(&pod)
if err != nil {
return errors.Wrap(err, "check if pod is updated")
}
if !podUpdated || !podReady(&pod) {
a.logger.Info(ctx, "Rack pod is not updated, awaiting readiness", "rack", rack.Name, "pod", pod.Name, "pod_ready", podReady(&pod), "pod_updated", podUpdated)
return nil
}
a.logger.Debug(ctx, "Rack Pod is up to date", "rack", rack.Name, "pod", pod.Name)
}
a.logger.Info(ctx, "Rack updated", "rack", rack.Name)
}

return nil
}
250 changes: 250 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright (C) 2021 ScyllaDB

package actions

import (
"context"
"reflect"
"testing"

"github.com/scylladb/go-log"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource"
"github.com/scylladb/scylla-operator/pkg/test/unit"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

type subAction struct {
updateState map[string]bool
updates []string
updateFunc func(sts *appsv1.StatefulSet) error
}

func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) {
return a.updateState[sts.Name], nil
}

func (a *subAction) PodUpdated(pod *corev1.Pod) (bool, error) {
return a.updateState[pod.Name], nil
}

func (a *subAction) Update(sts *appsv1.StatefulSet) error {
a.updates = append(a.updates, sts.Name)
if a.updateFunc != nil {
return a.updateFunc(sts)
}
return nil
}

func (a subAction) Name() string {
return "fake-sub-action"
}

func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")

objects := []runtime.Object{firstRack}
kubeClient := fake.NewSimpleClientset(objects...)

sa := &subAction{
updateFunc: func(sts *appsv1.StatefulSet) error {
sts.Generation += 1
return nil
},
updateState: map[string]bool{
firstRack.Name: false,
},
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
expectedUpdates := []string{firstRack.Name}
if !reflect.DeepEqual(sa.updates, expectedUpdates) {
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates)
}

sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Expected non nil err, got %s", err)
}

if sts.Generation == firstRack.Generation {
t.Error("Expected sts update")
}
}

func TestRackSynchronizedAction_WaitUntilAllRackPodsEntersReadyState(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(2, 1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image")

firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod)
firstRackSecondPod := unit.PodForRack(1, cluster.Spec.Datacenter.Racks[0], cluster)
secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod)

objects := []runtime.Object{
firstRack, secondRack,
firstRackPod, firstRackSecondPod, secondRackPod,
}

kubeClient := fake.NewSimpleClientset(objects...)

sa := &subAction{
updateState: map[string]bool{
firstRack.Name: true,
firstRackPod.Name: true,
firstRackSecondPod.Name: true,
secondRack.Name: false,
secondRackPod.Name: false,
},
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}

expectedUpdates := []string(nil)
if !reflect.DeepEqual(sa.updates, expectedUpdates) {
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates)
}
}

func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) {
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1, 1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image")

firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod)
secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod)

objects := []runtime.Object{
firstRack, secondRack,
firstRackPod, secondRackPod,
}
kubeClient := fake.NewSimpleClientset(objects...)

ts := []struct {
Name string
State map[string]bool
ExpectedUpdates []string
}{
{
Name: "nothing updated",
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: false,
firstRackPod.Name: false,
secondRackPod.Name: false,
},
ExpectedUpdates: []string{firstRack.Name},
},
{
Name: "rack updated, pod not yet",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: false,
firstRackPod.Name: false,
secondRackPod.Name: false,
},
ExpectedUpdates: nil,
},
{
Name: "first rack updated",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: false,
firstRackPod.Name: true,
secondRackPod.Name: false,
},
ExpectedUpdates: []string{secondRack.Name},
},
{
Name: "second rack updated, pod not yet",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: true,
firstRackPod.Name: true,
secondRackPod.Name: false,
},
ExpectedUpdates: nil,
},
{
Name: "all racks updated",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: true,
firstRackPod.Name: true,
secondRackPod.Name: true,
},
ExpectedUpdates: nil,
},
{
Name: "second rack updated, first not",
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: true,
firstRackPod.Name: false,
secondRackPod.Name: true,
},
ExpectedUpdates: []string{firstRack.Name},
},
}

for i := range ts {
test := ts[i]
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

sa := &subAction{updateState: test.State}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) {
t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates)
}
})
}
}
Loading

0 comments on commit 1b7266a

Please sign in to comment.