Skip to content

Commit

Permalink
cluster: automated sidecar upgrade (scylladb#187)
Browse files Browse the repository at this point in the history
Operator sets actual sidecar version in ScyllaCluster status.
If it's different than Operator version, sidecar container
is updated.

Fixes scylladb#187
  • Loading branch information
zimnx committed Feb 19, 2021
1 parent bc194d8 commit 598528e
Show file tree
Hide file tree
Showing 18 changed files with 704 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,9 @@ spec:
type: string
description: Pool of addresses which should be replaced by new nodes.
type: object
sidecarVersion:
description: SidecarVersion is the sidecar version in use.
type: string
version:
description: Version is the current version of Scylla in use.
type: string
Expand Down
3 changes: 3 additions & 0 deletions examples/common/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,9 @@ spec:
type: string
description: Pool of addresses which should be replaced by new nodes.
type: object
sidecarVersion:
description: SidecarVersion is the sidecar version in use.
type: string
version:
description: Version is the current version of Scylla in use.
type: string
Expand Down
9 changes: 6 additions & 3 deletions pkg/api/v1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ type UpgradeStatus struct {
type RackStatus struct {
// Version is the current version of Scylla in use.
Version string `json:"version"`
// SidecarVersion is the sidecar version in use.
SidecarVersion string `json:"sidecarVersion,omitempty"`
// Members is the current number of members requested in the specific Rack
Members int32 `json:"members"`
// ReadyMembers is the number of ready members in the specific Rack
Expand All @@ -307,9 +309,10 @@ type RackCondition struct {
type RackConditionType string

const (
RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving"
RackConditionTypeUpgrading RackConditionType = "RackUpgrading"
RackConditionTypeMemberReplacing RackConditionType = "MemberReplacing"
RackConditionTypeMemberLeaving RackConditionType = "MemberLeaving"
RackConditionTypeUpgrading RackConditionType = "RackUpgrading"
RackConditionTypeSidecarUpgrading RackConditionType = "RackSidecarUpgrading"
RackConditionTypeMemberReplacing RackConditionType = "MemberReplacing"
)

// +kubebuilder:object:root=true
Expand Down
94 changes: 94 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) 2017 ScyllaDB

package actions

import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util"
"github.com/scylladb/scylla-operator/pkg/naming"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type rackSynchronizedSubAction interface {
// RackUpdated should return whether requested update is already applied to a rack sts.
RackUpdated(sts *appsv1.StatefulSet) (bool, error)
// PodUpdated should return whether requested update is already applied to rack pod.
PodUpdated(pod *corev1.Pod) (bool, error)
// Update performs desired update.
Update(sts *appsv1.StatefulSet) error
// Name of action.
Name() string
}

// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner.
// Racks underlying StatefulSets will be updated one by one. Single Execute will perform at most single StatefulSet update.
// If previous rack pods aren't updated, Execute does nothing.
// Racks are upgraded in the same order as in ScyllaCluster Spec.
type rackSynchronizedAction struct {
subAction rackSynchronizedSubAction
cluster *scyllav1.ScyllaCluster
logger log.Logger
}

const (
RackSynchronizedActionPrefix = "rack-synchronized-"
)

func (a rackSynchronizedAction) Name() string {
return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name())
}

func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error {
for _, rack := range a.cluster.Spec.Datacenter.Racks {
sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient)
if err != nil {
return errors.Wrap(err, "get rack statefulset")
}

rackUpdated, err := a.subAction.RackUpdated(sts)
if err != nil {
return errors.Wrap(err, "determine if rack needs update")
}
if !rackUpdated {
upgradedSts := sts.DeepCopy()
if err := a.subAction.Update(upgradedSts); err != nil {
return errors.Wrap(err, "update rack")
}

// Patch rack sts and exit
a.logger.Info(ctx, "Patching rack definition", "rack", rack.Name)
return util.PatchStatefulSet(ctx, sts, upgradedSts, s.kubeclient)
}

a.logger.Info(ctx, "Rack already upgraded, checking Pods", "rack", rack.Name)

pods, err := s.kubeclient.CoreV1().Pods(a.cluster.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: naming.RackSelector(rack, a.cluster).String(),
})
if err != nil {
return errors.Wrap(err, "get pods")
}

for _, pod := range pods.Items {
podUpdated, err := a.subAction.PodUpdated(&pod)
if err != nil {
return errors.Wrap(err, "check if pod is updated")
}
if !podUpdated || !podReady(&pod) {
a.logger.Info(ctx, "Rack pod is not updated, awaiting readiness", "rack", rack.Name, "pod", pod.Name, "pod_ready", podReady(&pod), "pod_updated", podUpdated)
return nil
}
a.logger.Debug(ctx, "Rack Pod is up to date", "rack", rack.Name, "pod", pod.Name)
}
a.logger.Info(ctx, "Rack updated", "rack", rack.Name)
}

return nil
}
250 changes: 250 additions & 0 deletions pkg/controllers/cluster/actions/rack_synchronized_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright (C) 2017 ScyllaDB

package actions

import (
"context"
"reflect"
"testing"

"github.com/scylladb/go-log"
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource"
"github.com/scylladb/scylla-operator/pkg/test/unit"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

type subAction struct {
updateState map[string]bool
updates []string
updateFunc func(sts *appsv1.StatefulSet) error
}

func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) {
return a.updateState[sts.Name], nil
}

func (a *subAction) PodUpdated(pod *corev1.Pod) (bool, error) {
return a.updateState[pod.Name], nil
}

func (a *subAction) Update(sts *appsv1.StatefulSet) error {
a.updates = append(a.updates, sts.Name)
if a.updateFunc != nil {
return a.updateFunc(sts)
}
return nil
}

func (a subAction) Name() string {
return "fake-sub-action"
}

func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")

objects := []runtime.Object{firstRack}
kubeClient := fake.NewSimpleClientset(objects...)

sa := &subAction{
updateFunc: func(sts *appsv1.StatefulSet) error {
sts.Generation += 1
return nil
},
updateState: map[string]bool{
firstRack.Name: false,
},
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
expectedUpdates := []string{firstRack.Name}
if !reflect.DeepEqual(sa.updates, expectedUpdates) {
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates)
}

sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Expected non nil err, got %s", err)
}

if sts.Generation == firstRack.Generation {
t.Error("Expected sts update")
}
}

func TestRackSynchronizedAction_WaitUntilAllRackPodsEntersReadyState(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(2, 1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image")

firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod)
firstRackSecondPod := unit.PodForRack(1, cluster.Spec.Datacenter.Racks[0], cluster)
secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod)

objects := []runtime.Object{
firstRack, secondRack,
firstRackPod, firstRackSecondPod, secondRackPod,
}

kubeClient := fake.NewSimpleClientset(objects...)

sa := &subAction{
updateState: map[string]bool{
firstRack.Name: true,
firstRackPod.Name: true,
firstRackSecondPod.Name: true,
secondRack.Name: false,
secondRackPod.Name: false,
},
}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}

expectedUpdates := []string(nil)
if !reflect.DeepEqual(sa.updates, expectedUpdates) {
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates)
}
}

func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) {
ctx := context.Background()
logger, _ := log.NewProduction(log.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
})

cluster := unit.NewMultiRackCluster(1, 1)
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image")
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image")

firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod)
secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod)

objects := []runtime.Object{
firstRack, secondRack,
firstRackPod, secondRackPod,
}
kubeClient := fake.NewSimpleClientset(objects...)

ts := []struct {
Name string
State map[string]bool
ExpectedUpdates []string
}{
{
Name: "nothing updated",
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: false,
firstRackPod.Name: false,
secondRackPod.Name: false,
},
ExpectedUpdates: []string{firstRack.Name},
},
{
Name: "rack updated, pod not yet",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: false,
firstRackPod.Name: false,
secondRackPod.Name: false,
},
ExpectedUpdates: nil,
},
{
Name: "first rack updated",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: false,
firstRackPod.Name: true,
secondRackPod.Name: false,
},
ExpectedUpdates: []string{secondRack.Name},
},
{
Name: "second rack updated, pod not yet",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: true,
firstRackPod.Name: true,
secondRackPod.Name: false,
},
ExpectedUpdates: nil,
},
{
Name: "all racks updated",
State: map[string]bool{
firstRack.Name: true,
secondRack.Name: true,
firstRackPod.Name: true,
secondRackPod.Name: true,
},
ExpectedUpdates: nil,
},
{
Name: "second rack updated, first not",
State: map[string]bool{
firstRack.Name: false,
secondRack.Name: true,
firstRackPod.Name: false,
secondRackPod.Name: true,
},
ExpectedUpdates: []string{firstRack.Name},
},
}

for i := range ts {
test := ts[i]
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

sa := &subAction{updateState: test.State}
a := rackSynchronizedAction{
subAction: sa,
cluster: cluster,
logger: logger,
}
s := &State{kubeclient: kubeClient}

if err := a.Execute(ctx, s); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) {
t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates)
}
})
}
}
Loading

0 comments on commit 598528e

Please sign in to comment.