forked from scylladb/scylla-operator
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cluster: automated sidecar upgrade (scylladb#187)
Operator checks whether version of sidecar in each rack StatefulSet is up to date. If it's different than Operator version, sidecar container is updated. Fixes scylladb#187
- Loading branch information
Showing
15 changed files
with
604 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
// Copyright (C) 2021 ScyllaDB | ||
|
||
package actions | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scylladb/go-log" | ||
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" | ||
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" | ||
appsv1 "k8s.io/api/apps/v1" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
type rackSynchronizedSubAction interface { | ||
// RackUpdated should return whether requested update is already applied to a rack sts. | ||
RackUpdated(sts *appsv1.StatefulSet) (bool, error) | ||
// Update performs desired update. | ||
Update(sts *appsv1.StatefulSet) error | ||
// Name of action. | ||
Name() string | ||
} | ||
|
||
// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner. | ||
// Racks are upgraded in the same order as in ScyllaCluster Spec. | ||
// Single Execute call performs at most a single update. | ||
type rackSynchronizedAction struct { | ||
subAction rackSynchronizedSubAction | ||
cluster *scyllav1.ScyllaCluster | ||
logger log.Logger | ||
} | ||
|
||
const ( | ||
RackSynchronizedActionPrefix = "rack-synchronized-" | ||
) | ||
|
||
func (a rackSynchronizedAction) Name() string { | ||
return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name()) | ||
} | ||
|
||
func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error { | ||
for _, rack := range a.cluster.Spec.Datacenter.Racks { | ||
sts := &appsv1.StatefulSet{} | ||
sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient) | ||
if err != nil { | ||
return errors.Wrap(err, "get rack statefulset") | ||
} | ||
|
||
rackUpdated, err := a.subAction.RackUpdated(sts) | ||
if err != nil { | ||
return errors.Wrap(err, "determine if rack needs update") | ||
} | ||
if !rackUpdated { | ||
if err := a.subAction.Update(sts); err != nil { | ||
return errors.Wrap(err, "update rack") | ||
} | ||
|
||
a.logger.Info(ctx, "Updating rack definition", "rack", rack.Name) | ||
sts, err = s.kubeclient.AppsV1().StatefulSets(sts.Namespace).Update(ctx, sts, metav1.UpdateOptions{}) | ||
if err != nil { | ||
// Do not raise error in case of a conflict, reconcilation loop will be triggered again | ||
// because new version of STS we are watching is available. | ||
if apierrors.IsConflict(err) { | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
// Early exit, next rack will be updated once current one becomes ready. | ||
return nil | ||
} | ||
|
||
if !a.stsReady(rack, sts) { | ||
a.logger.Info(ctx, "Rack still upgrading, awaiting readiness", "rack", rack.Name) | ||
return nil | ||
} | ||
|
||
a.logger.Info(ctx, "Rack updated", "rack", rack.Name) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (a rackSynchronizedAction) stsReady(rack scyllav1.RackSpec, sts *appsv1.StatefulSet) bool { | ||
return sts.Generation == sts.Status.ObservedGeneration && | ||
sts.Status.ReadyReplicas == rack.Members && | ||
sts.Status.UpdateRevision == sts.Status.CurrentRevision | ||
} |
193 changes: 193 additions & 0 deletions
193
pkg/controllers/cluster/actions/rack_synchronized_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
// Copyright (C) 2021 ScyllaDB | ||
|
||
package actions | ||
|
||
import ( | ||
"context" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/scylladb/go-log" | ||
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" | ||
"github.com/scylladb/scylla-operator/pkg/test/unit" | ||
"go.uber.org/zap" | ||
"go.uber.org/zap/zapcore" | ||
appsv1 "k8s.io/api/apps/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/kubernetes/fake" | ||
) | ||
|
||
type subAction struct { | ||
updateState map[string]bool | ||
updates []string | ||
updateFunc func(sts *appsv1.StatefulSet) error | ||
} | ||
|
||
func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { | ||
return a.updateState[sts.Name], nil | ||
} | ||
|
||
func (a *subAction) Update(sts *appsv1.StatefulSet) error { | ||
a.updates = append(a.updates, sts.Name) | ||
if a.updateFunc != nil { | ||
return a.updateFunc(sts) | ||
} | ||
return nil | ||
} | ||
|
||
func (a subAction) Name() string { | ||
return "fake-sub-action" | ||
} | ||
|
||
func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) { | ||
t.Parallel() | ||
ctx := context.Background() | ||
logger, _ := log.NewProduction(log.Config{ | ||
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), | ||
}) | ||
|
||
cluster := unit.NewMultiRackCluster(1) | ||
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") | ||
|
||
objects := []runtime.Object{firstRack} | ||
kubeClient := fake.NewSimpleClientset(objects...) | ||
|
||
sa := &subAction{ | ||
updateFunc: func(sts *appsv1.StatefulSet) error { | ||
sts.Generation += 1 | ||
return nil | ||
}, | ||
updateState: map[string]bool{ | ||
firstRack.Name: false, | ||
}, | ||
} | ||
a := rackSynchronizedAction{ | ||
subAction: sa, | ||
cluster: cluster, | ||
logger: logger, | ||
} | ||
s := &State{kubeclient: kubeClient} | ||
|
||
if err := a.Execute(ctx, s); err != nil { | ||
t.Fatal(err) | ||
} | ||
expectedUpdates := []string{firstRack.Name} | ||
if !reflect.DeepEqual(sa.updates, expectedUpdates) { | ||
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates) | ||
} | ||
|
||
sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{}) | ||
if err != nil { | ||
t.Errorf("cannot get statefulset: %s", err) | ||
} | ||
|
||
if sts.Generation == firstRack.Generation { | ||
t.Error("Expected sts update") | ||
} | ||
} | ||
|
||
func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) { | ||
ctx := context.Background() | ||
logger, _ := log.NewProduction(log.Config{ | ||
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), | ||
}) | ||
|
||
cluster := unit.NewMultiRackCluster(1, 1) | ||
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") | ||
firstRackReady := firstRack.DeepCopy() | ||
firstRackReady.Status.ObservedGeneration = firstRackReady.Generation | ||
firstRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[0].Members | ||
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image") | ||
secondRackReady := secondRack.DeepCopy() | ||
secondRackReady.Status.ObservedGeneration = secondRackReady.Generation | ||
secondRackReady.Status.ReadyReplicas = cluster.Spec.Datacenter.Racks[1].Members | ||
|
||
ts := []struct { | ||
Name string | ||
Objects []runtime.Object | ||
State map[string]bool | ||
ExpectedUpdates []string | ||
}{ | ||
{ | ||
Name: "nothing updated", | ||
Objects: []runtime.Object{firstRack, secondRack}, | ||
State: map[string]bool{ | ||
firstRack.Name: false, | ||
secondRack.Name: false, | ||
}, | ||
ExpectedUpdates: []string{firstRack.Name}, | ||
}, | ||
{ | ||
Name: "first rack updated, not ready", | ||
Objects: []runtime.Object{firstRack, secondRack}, | ||
State: map[string]bool{ | ||
firstRack.Name: true, | ||
secondRack.Name: false, | ||
}, | ||
ExpectedUpdates: nil, | ||
}, | ||
{ | ||
Name: "first rack updated and ready", | ||
Objects: []runtime.Object{firstRackReady, secondRack}, | ||
State: map[string]bool{ | ||
firstRackReady.Name: true, | ||
secondRack.Name: false, | ||
}, | ||
ExpectedUpdates: []string{secondRack.Name}, | ||
}, | ||
{ | ||
Name: "all racks updated", | ||
Objects: []runtime.Object{firstRack, secondRack}, | ||
State: map[string]bool{ | ||
firstRack.Name: true, | ||
secondRack.Name: true, | ||
}, | ||
ExpectedUpdates: nil, | ||
}, | ||
{ | ||
Name: "second rack updated, first not", | ||
Objects: []runtime.Object{firstRack, secondRack}, | ||
State: map[string]bool{ | ||
firstRack.Name: false, | ||
secondRack.Name: true, | ||
}, | ||
ExpectedUpdates: []string{firstRack.Name}, | ||
}, | ||
{ | ||
Name: "second rack updated and ready, first not", | ||
Objects: []runtime.Object{firstRack, secondRackReady}, | ||
State: map[string]bool{ | ||
firstRack.Name: false, | ||
secondRack.Name: true, | ||
}, | ||
ExpectedUpdates: []string{firstRack.Name}, | ||
}, | ||
} | ||
|
||
for i := range ts { | ||
test := ts[i] | ||
t.Run(test.Name, func(t *testing.T) { | ||
t.Parallel() | ||
|
||
kubeClient := fake.NewSimpleClientset(test.Objects...) | ||
|
||
sa := &subAction{ | ||
updateState: test.State, | ||
} | ||
a := rackSynchronizedAction{ | ||
subAction: sa, | ||
cluster: cluster, | ||
logger: logger, | ||
} | ||
s := &State{kubeclient: kubeClient} | ||
|
||
if err := a.Execute(ctx, s); err != nil { | ||
t.Fatal(err) | ||
} | ||
if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) { | ||
t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates) | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Copyright (C) 2021 ScyllaDB | ||
|
||
package actions | ||
|
||
import ( | ||
"github.com/pkg/errors" | ||
"github.com/scylladb/go-log" | ||
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" | ||
"github.com/scylladb/scylla-operator/pkg/naming" | ||
appsv1 "k8s.io/api/apps/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
) | ||
|
||
const ( | ||
SidecarVersionUpgradeAction = "sidecar-upgrade" | ||
) | ||
|
||
type SidecarUpgrade struct { | ||
sidecar corev1.Container | ||
} | ||
|
||
func (a *SidecarUpgrade) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { | ||
sidecarIdx, err := naming.FindSidecarInjectorContainer(sts.Spec.Template.Spec.InitContainers) | ||
if err != nil { | ||
return false, errors.Wrap(err, "find sidecar container in pod") | ||
} | ||
|
||
return sts.Spec.Template.Spec.InitContainers[sidecarIdx].Image == a.sidecar.Image, nil | ||
} | ||
|
||
func (a *SidecarUpgrade) Update(sts *appsv1.StatefulSet) error { | ||
initContainers := sts.Spec.Template.Spec.InitContainers | ||
sidecarIdx, err := naming.FindSidecarInjectorContainer(initContainers) | ||
if err != nil { | ||
return errors.Wrap(err, "find sidecar container in existing sts") | ||
} | ||
|
||
sts.Spec.Template.Spec.InitContainers[sidecarIdx] = a.sidecar | ||
return nil | ||
} | ||
|
||
func NewSidecarUpgrade(c *scyllav1.ScyllaCluster, sidecar corev1.Container, l log.Logger) *rackSynchronizedAction { | ||
return &rackSynchronizedAction{ | ||
subAction: &SidecarUpgrade{ | ||
sidecar: sidecar, | ||
}, | ||
cluster: c, | ||
logger: l, | ||
} | ||
} | ||
|
||
func (a *SidecarUpgrade) Name() string { | ||
return SidecarVersionUpgradeAction | ||
} |
Oops, something went wrong.