-
Notifications
You must be signed in to change notification settings - Fork 179
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
controller: added replace dead node feature (#48)
Dead node can be replaced using special label added to member service. When label "scylla/replace" with empty string value is added to member service, Operator will remove PVC attached to dead node, delete dead pod and remove member service. New pod will be scheduled on different node, and scylla will be run with additional parameter `--replace-address-first-boot` together with IP address of deleted node. Fixes #48
- Loading branch information
Showing
19 changed files
with
688 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
// Copyright (C) 2017 ScyllaDB | ||
|
||
package actions | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/scylladb/go-log" | ||
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1" | ||
"github.com/scylladb/scylla-operator/pkg/controllers/cluster/util" | ||
"github.com/scylladb/scylla-operator/pkg/naming" | ||
corev1 "k8s.io/api/core/v1" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
const RackReplaceNodeAction = "rack-replace-node" | ||
|
||
// Implements Action interface | ||
var _ Action = &RackReplaceNode{} | ||
|
||
type RackReplaceNode struct { | ||
Rack scyllav1alpha1.RackSpec | ||
Cluster *scyllav1alpha1.ScyllaCluster | ||
Logger log.Logger | ||
} | ||
|
||
func NewRackReplaceNodeAction(r scyllav1alpha1.RackSpec, c *scyllav1alpha1.ScyllaCluster, l log.Logger) *RackReplaceNode { | ||
return &RackReplaceNode{ | ||
Rack: r, | ||
Cluster: c, | ||
Logger: l, | ||
} | ||
} | ||
|
||
func (a *RackReplaceNode) Name() string { | ||
return RackReplaceNodeAction | ||
} | ||
|
||
func (a *RackReplaceNode) Execute(ctx context.Context, s *State) error { | ||
a.Logger.Debug(ctx, "Replace action executed") | ||
|
||
r, c := a.Rack, a.Cluster | ||
|
||
// Find the member to decommission | ||
memberServices := &corev1.ServiceList{} | ||
|
||
err := s.List(ctx, memberServices, &client.ListOptions{ | ||
LabelSelector: naming.RackSelector(r, c), | ||
}) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to list Member Service") | ||
} | ||
|
||
for _, member := range memberServices.Items { | ||
if value, ok := member.Labels[naming.ReplaceLabel]; ok { | ||
if value == "" { | ||
a.Logger.Debug(ctx, "Member needs to be replaced", "member", member.Name) | ||
if err := a.replaceNode(ctx, s, &member); err != nil { | ||
return errors.WithStack(err) | ||
} | ||
} else { | ||
a.Logger.Debug(ctx, "Member is being replaced", "member", member.Name) | ||
if err := a.maybeFinishReplaceNode(ctx, s, &member); err != nil { | ||
return errors.WithStack(err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (a *RackReplaceNode) maybeFinishReplaceNode(ctx context.Context, state *State, member *corev1.Service) error { | ||
r, c, cc := a.Rack, a.Cluster, state.Client | ||
|
||
pod := &corev1.Pod{} | ||
err := cc.Get(ctx, naming.NamespacedName(member.Name, member.Namespace), pod) | ||
if err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return errors.Wrap(err, "get pod") | ||
} | ||
a.Logger.Info(ctx, "Member Pod not found", "member", member.Name) | ||
} else { | ||
if replaceAddr := member.Labels[naming.ReplaceLabel]; replaceAddr != "" { | ||
a.Logger.Info(ctx, "Replace member Pod found", "member", member.Name, "replace_address", replaceAddr, "ready", podReady(pod)) | ||
if podReady(pod) { | ||
a.Logger.Info(ctx, "Replace member Pod ready, removing replace label", "member", member.Name, "replace_address", replaceAddr) | ||
|
||
old := member.DeepCopy() | ||
delete(member.Labels, naming.ReplaceLabel) | ||
if err := util.PatchService(ctx, old, member, state.kubeclient); err != nil { | ||
return errors.Wrap(err, "error patching member service") | ||
} | ||
|
||
a.Logger.Info(ctx, "Removing replace IP from Cluster status", "member", member.Name) | ||
delete(c.Status.Racks[r.Name].ReplaceAddresses, member.Name) | ||
|
||
state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, | ||
fmt.Sprintf("Rack %q replaced %q node", r.Name, member.Name), | ||
) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func podReady(pod *corev1.Pod) bool { | ||
for _, c := range pod.Status.Conditions { | ||
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
const ( | ||
retryInterval = time.Second | ||
waitForPVCTimeout = 30 * time.Second | ||
) | ||
|
||
func waitForPVC(ctx context.Context, cc client.Client, name, namespace string) error { | ||
pvc := &corev1.PersistentVolumeClaim{} | ||
return wait.PollImmediate(retryInterval, waitForPVCTimeout, func() (bool, error) { | ||
err := cc.Get(ctx, naming.NamespacedName(name, namespace), pvc) | ||
if err != nil { | ||
if apierrors.IsNotFound(err) { | ||
return false, nil | ||
} | ||
return false, err | ||
} | ||
return true, nil | ||
}) | ||
} | ||
|
||
func (a *RackReplaceNode) replaceNode(ctx context.Context, state *State, member *corev1.Service) error { | ||
r, c := a.Rack, a.Cluster | ||
|
||
cc := state.Client | ||
|
||
// Save replace address in RackStatus | ||
rackStatus := c.Status.Racks[r.Name] | ||
rackStatus.ReplaceAddresses[member.Name] = member.Spec.ClusterIP | ||
a.Logger.Debug(ctx, "Adding member address to replace address list", "member", member.Name, "ip", member.Spec.ClusterIP, "replace_addresses", rackStatus.ReplaceAddresses) | ||
|
||
// Proceed to destructive operations only when IP address is saved in cluster Status. | ||
if err := cc.Status().Update(ctx, c); err != nil { | ||
return errors.Wrap(err, "failed to delete pvc") | ||
} | ||
|
||
// Delete PVC if it exists | ||
pvc := &corev1.PersistentVolumeClaim{} | ||
err := cc.Get(ctx, naming.NamespacedName(naming.PVCNameForPod(member.Name), member.Namespace), pvc) | ||
if err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return errors.Wrap(err, "failed to get pvc") | ||
} | ||
a.Logger.Info(ctx, "Member PVC not found", "member", member.Name) | ||
} else { | ||
a.Logger.Info(ctx, "Deleting member PVC", "member", member.Name, "pvc", pvc.Name) | ||
if err = cc.Delete(ctx, pvc); err != nil { | ||
return errors.Wrap(err, "failed to delete pvc") | ||
} | ||
|
||
// Wait until PVC is deleted, ignore error | ||
a.Logger.Info(ctx, "Waiting for PVC deletion", "member", member.Name, "pvc", pvc.Name) | ||
_ = waitForPVC(ctx, cc, naming.PVCNameForPod(member.Name), member.Namespace) | ||
|
||
state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, | ||
fmt.Sprintf("Rack %q removed %q PVC", r.Name, member.Name), | ||
) | ||
} | ||
|
||
// Delete Pod if it exists | ||
pod := &corev1.Pod{} | ||
err = cc.Get(ctx, naming.NamespacedName(member.Name, member.Namespace), pod) | ||
if err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return errors.Wrap(err, "get pod") | ||
} | ||
a.Logger.Info(ctx, "Member Pod not found", "member", member.Name) | ||
} else { | ||
a.Logger.Info(ctx, "Deleting member Pod", "member", member.Name) | ||
if err = cc.Delete(ctx, pod, client.GracePeriodSeconds(0)); err != nil { | ||
return errors.Wrap(err, "delete pod") | ||
} | ||
state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, | ||
fmt.Sprintf("Rack %q removed %q Pod", r.Name, member.Name), | ||
) | ||
} | ||
|
||
a.Logger.Info(ctx, "Deleting member Service", "member", member.Name) | ||
if err := cc.Delete(ctx, member); err != nil { | ||
return errors.Wrap(err, "delete member service") | ||
} | ||
|
||
state.recorder.Event(c, corev1.EventTypeNormal, naming.SuccessSynced, | ||
fmt.Sprintf("Rack %q removed %q Service", r.Name, member.Name), | ||
) | ||
|
||
return nil | ||
} |
Oops, something went wrong.