forked from scylladb/scylla-operator
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cluster: automated sidecar upgrade (scylladb#187)
Operator sets actual sidecar version in ScyllaCluster status. If it's different than Operator version, sidecar container is updated. Fixes scylladb#187
- Loading branch information
Showing
18 changed files
with
704 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
// Copyright (C) 2021 ScyllaDB | ||
|
||
package actions | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scylladb/go-log" | ||
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/v1" | ||
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" | ||
"github.com/scylladb/scylla-operator/pkg/naming" | ||
appsv1 "k8s.io/api/apps/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
type rackSynchronizedSubAction interface { | ||
// RackUpdated should return whether requested update is already applied to a rack sts. | ||
RackUpdated(sts *appsv1.StatefulSet) (bool, error) | ||
// PodUpdated should return whether requested update is already applied to rack pod. | ||
PodUpdated(pod *corev1.Pod) (bool, error) | ||
// Update performs desired update. | ||
Update(sts *appsv1.StatefulSet) error | ||
// Name of action. | ||
Name() string | ||
} | ||
|
||
// rackSynchronizedAction can be used to perform an multi rack update in synchronous manner. | ||
// Racks underlying StatefulSets will be updated one by one. Single Execute will perform at most single StatefulSet update. | ||
// If previous rack pods aren't updated, Execute does nothing. | ||
// Racks are upgraded in the same order as in ScyllaCluster Spec. | ||
type rackSynchronizedAction struct { | ||
subAction rackSynchronizedSubAction | ||
cluster *scyllav1.ScyllaCluster | ||
logger log.Logger | ||
} | ||
|
||
const ( | ||
RackSynchronizedActionPrefix = "rack-synchronized-" | ||
) | ||
|
||
func (a rackSynchronizedAction) Name() string { | ||
return fmt.Sprintf("%s%s", RackSynchronizedActionPrefix, a.subAction.Name()) | ||
} | ||
|
||
func (a rackSynchronizedAction) Execute(ctx context.Context, s *State) error { | ||
for _, rack := range a.cluster.Spec.Datacenter.Racks { | ||
sts, err := util.GetStatefulSetForRack(ctx, rack, a.cluster, s.kubeclient) | ||
if err != nil { | ||
return errors.Wrap(err, "get rack statefulset") | ||
} | ||
|
||
rackUpdated, err := a.subAction.RackUpdated(sts) | ||
if err != nil { | ||
return errors.Wrap(err, "determine if rack needs update") | ||
} | ||
if !rackUpdated { | ||
upgradedSts := sts.DeepCopy() | ||
if err := a.subAction.Update(upgradedSts); err != nil { | ||
return errors.Wrap(err, "update rack") | ||
} | ||
|
||
// Patch rack sts and exit | ||
a.logger.Info(ctx, "Patching rack definition", "rack", rack.Name) | ||
return util.PatchStatefulSet(ctx, sts, upgradedSts, s.kubeclient) | ||
} | ||
|
||
a.logger.Info(ctx, "Rack already upgraded, checking Pods", "rack", rack.Name) | ||
|
||
pods, err := s.kubeclient.CoreV1().Pods(a.cluster.Namespace).List(ctx, metav1.ListOptions{ | ||
LabelSelector: naming.RackSelector(rack, a.cluster).String(), | ||
}) | ||
if err != nil { | ||
return errors.Wrap(err, "get pods") | ||
} | ||
|
||
for _, pod := range pods.Items { | ||
podUpdated, err := a.subAction.PodUpdated(&pod) | ||
if err != nil { | ||
return errors.Wrap(err, "check if pod is updated") | ||
} | ||
if !podUpdated || !podReady(&pod) { | ||
a.logger.Info(ctx, "Rack pod is not updated, awaiting readiness", "rack", rack.Name, "pod", pod.Name, "pod_ready", podReady(&pod), "pod_updated", podUpdated) | ||
return nil | ||
} | ||
a.logger.Debug(ctx, "Rack Pod is up to date", "rack", rack.Name, "pod", pod.Name) | ||
} | ||
a.logger.Info(ctx, "Rack updated", "rack", rack.Name) | ||
} | ||
|
||
return nil | ||
} |
250 changes: 250 additions & 0 deletions
250
pkg/controllers/cluster/actions/rack_synchronized_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,250 @@ | ||
// Copyright (C) 2021 ScyllaDB | ||
|
||
package actions | ||
|
||
import ( | ||
"context" | ||
"reflect" | ||
"testing" | ||
|
||
"github.com/scylladb/go-log" | ||
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/resource" | ||
"github.com/scylladb/scylla-operator/pkg/test/unit" | ||
"go.uber.org/zap" | ||
"go.uber.org/zap/zapcore" | ||
appsv1 "k8s.io/api/apps/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/kubernetes/fake" | ||
) | ||
|
||
type subAction struct { | ||
updateState map[string]bool | ||
updates []string | ||
updateFunc func(sts *appsv1.StatefulSet) error | ||
} | ||
|
||
func (a *subAction) RackUpdated(sts *appsv1.StatefulSet) (bool, error) { | ||
return a.updateState[sts.Name], nil | ||
} | ||
|
||
func (a *subAction) PodUpdated(pod *corev1.Pod) (bool, error) { | ||
return a.updateState[pod.Name], nil | ||
} | ||
|
||
func (a *subAction) Update(sts *appsv1.StatefulSet) error { | ||
a.updates = append(a.updates, sts.Name) | ||
if a.updateFunc != nil { | ||
return a.updateFunc(sts) | ||
} | ||
return nil | ||
} | ||
|
||
func (a subAction) Name() string { | ||
return "fake-sub-action" | ||
} | ||
|
||
func TestRackSynchronizedAction_SubActionUpdatesRack(t *testing.T) { | ||
t.Parallel() | ||
ctx := context.Background() | ||
logger, _ := log.NewProduction(log.Config{ | ||
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), | ||
}) | ||
|
||
cluster := unit.NewMultiRackCluster(1) | ||
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") | ||
|
||
objects := []runtime.Object{firstRack} | ||
kubeClient := fake.NewSimpleClientset(objects...) | ||
|
||
sa := &subAction{ | ||
updateFunc: func(sts *appsv1.StatefulSet) error { | ||
sts.Generation += 1 | ||
return nil | ||
}, | ||
updateState: map[string]bool{ | ||
firstRack.Name: false, | ||
}, | ||
} | ||
a := rackSynchronizedAction{ | ||
subAction: sa, | ||
cluster: cluster, | ||
logger: logger, | ||
} | ||
s := &State{kubeclient: kubeClient} | ||
|
||
if err := a.Execute(ctx, s); err != nil { | ||
t.Fatal(err) | ||
} | ||
expectedUpdates := []string{firstRack.Name} | ||
if !reflect.DeepEqual(sa.updates, expectedUpdates) { | ||
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates) | ||
} | ||
|
||
sts, err := kubeClient.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, firstRack.Name, metav1.GetOptions{}) | ||
if err != nil { | ||
t.Errorf("Expected non nil err, got %s", err) | ||
} | ||
|
||
if sts.Generation == firstRack.Generation { | ||
t.Error("Expected sts update") | ||
} | ||
} | ||
|
||
func TestRackSynchronizedAction_WaitUntilAllRackPodsEntersReadyState(t *testing.T) { | ||
t.Parallel() | ||
ctx := context.Background() | ||
logger, _ := log.NewProduction(log.Config{ | ||
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), | ||
}) | ||
|
||
cluster := unit.NewMultiRackCluster(2, 1) | ||
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") | ||
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image") | ||
|
||
firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod) | ||
firstRackSecondPod := unit.PodForRack(1, cluster.Spec.Datacenter.Racks[0], cluster) | ||
secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod) | ||
|
||
objects := []runtime.Object{ | ||
firstRack, secondRack, | ||
firstRackPod, firstRackSecondPod, secondRackPod, | ||
} | ||
|
||
kubeClient := fake.NewSimpleClientset(objects...) | ||
|
||
sa := &subAction{ | ||
updateState: map[string]bool{ | ||
firstRack.Name: true, | ||
firstRackPod.Name: true, | ||
firstRackSecondPod.Name: true, | ||
secondRack.Name: false, | ||
secondRackPod.Name: false, | ||
}, | ||
} | ||
a := rackSynchronizedAction{ | ||
subAction: sa, | ||
cluster: cluster, | ||
logger: logger, | ||
} | ||
s := &State{kubeclient: kubeClient} | ||
|
||
if err := a.Execute(ctx, s); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
expectedUpdates := []string(nil) | ||
if !reflect.DeepEqual(sa.updates, expectedUpdates) { | ||
t.Errorf("Expected %s updates, got %s", expectedUpdates, sa.updates) | ||
} | ||
} | ||
|
||
func TestRackSynchronizedAction_RackAreUpgradedInSequence(t *testing.T) { | ||
ctx := context.Background() | ||
logger, _ := log.NewProduction(log.Config{ | ||
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), | ||
}) | ||
|
||
cluster := unit.NewMultiRackCluster(1, 1) | ||
firstRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[0], cluster, "image") | ||
secondRack := resource.StatefulSetForRack(cluster.Spec.Datacenter.Racks[1], cluster, "image") | ||
|
||
firstRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[0], cluster, unit.ReadyPod) | ||
secondRackPod := unit.PodForRack(0, cluster.Spec.Datacenter.Racks[1], cluster, unit.ReadyPod) | ||
|
||
objects := []runtime.Object{ | ||
firstRack, secondRack, | ||
firstRackPod, secondRackPod, | ||
} | ||
kubeClient := fake.NewSimpleClientset(objects...) | ||
|
||
ts := []struct { | ||
Name string | ||
State map[string]bool | ||
ExpectedUpdates []string | ||
}{ | ||
{ | ||
Name: "nothing updated", | ||
State: map[string]bool{ | ||
firstRack.Name: false, | ||
secondRack.Name: false, | ||
firstRackPod.Name: false, | ||
secondRackPod.Name: false, | ||
}, | ||
ExpectedUpdates: []string{firstRack.Name}, | ||
}, | ||
{ | ||
Name: "rack updated, pod not yet", | ||
State: map[string]bool{ | ||
firstRack.Name: true, | ||
secondRack.Name: false, | ||
firstRackPod.Name: false, | ||
secondRackPod.Name: false, | ||
}, | ||
ExpectedUpdates: nil, | ||
}, | ||
{ | ||
Name: "first rack updated", | ||
State: map[string]bool{ | ||
firstRack.Name: true, | ||
secondRack.Name: false, | ||
firstRackPod.Name: true, | ||
secondRackPod.Name: false, | ||
}, | ||
ExpectedUpdates: []string{secondRack.Name}, | ||
}, | ||
{ | ||
Name: "second rack updated, pod not yet", | ||
State: map[string]bool{ | ||
firstRack.Name: true, | ||
secondRack.Name: true, | ||
firstRackPod.Name: true, | ||
secondRackPod.Name: false, | ||
}, | ||
ExpectedUpdates: nil, | ||
}, | ||
{ | ||
Name: "all racks updated", | ||
State: map[string]bool{ | ||
firstRack.Name: true, | ||
secondRack.Name: true, | ||
firstRackPod.Name: true, | ||
secondRackPod.Name: true, | ||
}, | ||
ExpectedUpdates: nil, | ||
}, | ||
{ | ||
Name: "second rack updated, first not", | ||
State: map[string]bool{ | ||
firstRack.Name: false, | ||
secondRack.Name: true, | ||
firstRackPod.Name: false, | ||
secondRackPod.Name: true, | ||
}, | ||
ExpectedUpdates: []string{firstRack.Name}, | ||
}, | ||
} | ||
|
||
for i := range ts { | ||
test := ts[i] | ||
t.Run(test.Name, func(t *testing.T) { | ||
t.Parallel() | ||
|
||
sa := &subAction{updateState: test.State} | ||
a := rackSynchronizedAction{ | ||
subAction: sa, | ||
cluster: cluster, | ||
logger: logger, | ||
} | ||
s := &State{kubeclient: kubeClient} | ||
|
||
if err := a.Execute(ctx, s); err != nil { | ||
t.Fatal(err) | ||
} | ||
if !reflect.DeepEqual(sa.updates, test.ExpectedUpdates) { | ||
t.Errorf("Expected %s updates, got %s", test.ExpectedUpdates, sa.updates) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.