Skip to content

Commit

Permalink
fix: validate disruption command after tainting
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Apr 11, 2024
1 parent 43da360 commit e4d93cc
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 14 deletions.
67 changes: 53 additions & 14 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ type Controller struct {
lastRun map[string]time.Time
}

// pollingPeriod that we inspect cluster to look for opportunities to disrupt
const pollingPeriod = 10 * time.Second
const (
// pollingPeriod that we inspect cluster to look for opportunities to disrupt
pollingPeriod = 10 * time.Second

// commandExecutionDelay is the duration spent waiting after tainting candidate nodes before validaing the command
commandValidationDelay = 5 * time.Second
)

var errCandidateDeleting = fmt.Errorf("candidate is deleting")

Expand Down Expand Up @@ -180,11 +185,13 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro

// executeCommand will do the following, untainting if the step fails.
// 1. Taint candidate nodes
// 2. Spin up replacement nodes
// 3. Add Command to orchestration.Queue to wait to delete the candiates.
// 2. Validate the command
// 3. Spin up replacement nodes
// 4. Add Command to orchestration.Queue to wait to delete the candiates.
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error {

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.23.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.24.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)

Check failure on line 191 in pkg/controllers/disruption/controller.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

cyclomatic complexity 13 of func `(*Controller).executeCommand` is high (> 11) (gocyclo)
commandID := uuid.NewUUID()
logging.FromContext(ctx).With("command-id", commandID).Infof("disrupting via %s %s", m.Type(), cmd)
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("command-id", commandID))
logging.FromContext(ctx).Infof("disrupting via %s %s", m.Type(), cmd)

stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
Expand All @@ -194,10 +201,42 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}

select {
case <-ctx.Done():
return fmt.Errorf("interrupted (command-id: %s)", commandID)
case <-c.clock.After(commandValidationDelay):
}

// Revalidate the proposed command after the node has been tainted.
// This addresses a race conditions where pods can schedule to the candidate nodes after the command has been finalized, but before the candidates were tainted.
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, m.ShouldDisrupt, c.queue)
if err != nil {
return fmt.Errorf("validating command, determining candidates, %w", err)
}
disruptionBudgetMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
if err != nil {
return fmt.Errorf("validating command, building disruption budgets, %w", err)
}
validatedCommand := m.ValidateCommand(ctx, cmd, disruptionBudgetMapping, candidates...)
if len(validatedCommand.candidates) == 0 {
logging.FromContext(ctx).Infof("abandoning disruption attempt, 0 out of %d nodes are still valid candidates", len(cmd.candidates))
return nil
}
if len(validatedCommand.candidates) != len(cmd.candidates) {
invalidatedCandidates := lo.FilterMap(cmd.candidates, func(candidate *Candidate, _ int) (*state.StateNode, bool) {
return candidate.StateNode, !lo.ContainsBy(validatedCommand.candidates, func(c *Candidate) bool {
return c.Name() == candidate.Name()
})
})
if err = state.RequireNoScheduleTaint(ctx, c.kubeClient, false, invalidatedCandidates...); err != nil {
return fmt.Errorf("untainting invalidated candidates, %w", err)
}
logging.FromContext(ctx).Infof("updated disruption command after validation, disrupting via %s %s", m.Type(), validatedCommand)
}

var nodeClaimNames []string
var err error
if len(cmd.replacements) > 0 {
if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil {
if len(validatedCommand.replacements) > 0 {
if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, validatedCommand); err != nil {
// If we failed to launch the replacement, don't disrupt. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
return multierr.Append(fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
Expand All @@ -215,32 +254,32 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// the node is cleaned up.
schedulingResults.Record(logging.WithLogger(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
providerIDs := lo.Map(validatedCommand.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Type(), m.ConsolidationType())); err != nil {
lo.Map(validatedCommand.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Type(), m.ConsolidationType())); err != nil {
c.cluster.UnmarkForDeletion(providerIDs...)
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)))
}

// An action is only performed and pods/nodes are only disrupted after a successful add to the queue
ActionsPerformedCounter.With(map[string]string{
actionLabel: string(cmd.Action()),
actionLabel: string(validatedCommand.Action()),
methodLabel: m.Type(),
consolidationTypeLabel: m.ConsolidationType(),
}).Inc()
for _, cd := range cmd.candidates {
for _, cd := range validatedCommand.candidates {
NodesDisruptedCounter.With(map[string]string{
metrics.NodePoolLabel: cd.nodePool.Name,
actionLabel: string(cmd.Action()),
actionLabel: string(validatedCommand.Action()),
methodLabel: m.Type(),
consolidationTypeLabel: m.ConsolidationType(),
}).Inc()
PodsDisruptedCounter.With(map[string]string{
metrics.NodePoolLabel: cd.nodePool.Name,
actionLabel: string(cmd.Action()),
actionLabel: string(validatedCommand.Action()),
methodLabel: m.Type(),
consolidationTypeLabel: m.ConsolidationType(),
}).Add(float64(len(cd.reschedulablePods)))
Expand Down
22 changes: 22 additions & 0 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"sort"

"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand Down Expand Up @@ -119,6 +120,27 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
return Command{}, scheduling.Results{}, nil
}

func (d *Drift) ValidateCommand(ctx context.Context, cmd Command, disruptionBudgetMapping map[string]int, candidates ...*Candidate) Command {
cmd.candidates = lo.Reject(cmd.candidates, func(candidate *Candidate, _ int) bool {
currentCandidate, ok := lo.Find(candidates, func(c *Candidate) bool {
return c.Name() == candidate.Name()
})
if !ok {
return true
}
// Filter out any empty candidates which are no longer empty since a scheduling simulation was not performed
if len(candidate.reschedulablePods) == 0 && len(currentCandidate.reschedulablePods) != 0 {
return true
}
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
return true
}
disruptionBudgetMapping[candidate.nodePool.Name] -= 1
return false
})
return cmd
}

func (d *Drift) Type() string {
return metrics.DriftReason
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/controllers/disruption/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ func (e *Emptiness) ComputeCommand(_ context.Context, disruptionBudgetMapping ma
}, scheduling.Results{}, nil
}

func (e *Emptiness) ValidateCommand(ctx context.Context, cmd Command, disruptionBudgetMapping map[string]int, candidates ...*Candidate) Command {
cmd.candidates = lo.Reject(cmd.candidates, func(candidate *Candidate, _ int) bool {
currentCandidate, ok := lo.Find(candidates, func(c *Candidate) bool {
return c.Name() == candidate.Name()
})
if !ok {
return true
}
// Filter out any empty candidates which are no longer empty
if len(currentCandidate.reschedulablePods) != 0 {
return true
}
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
return true
}
disruptionBudgetMapping[candidate.nodePool.Name] -= 1
return false
})
return cmd
}

func (e *Emptiness) Type() string {
return metrics.EmptinessReason
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"

"github.com/samber/lo"
"knative.dev/pkg/logging"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
Expand Down Expand Up @@ -116,6 +117,27 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return cmd, scheduling.Results{}, nil
}

func (e *EmptyNodeConsolidation) ValidateCommand(ctx context.Context, cmd Command, disruptionBudgetMapping map[string]int, candidates ...*Candidate) Command {
cmd.candidates = lo.Reject(cmd.candidates, func(candidate *Candidate, _ int) bool {
currentCandidate, ok := lo.Find(candidates, func(c *Candidate) bool {
return c.Name() == candidate.Name()
})
if !ok {
return true
}
// Filter out any empty candidates which are no longer empty since a scheduling simulation was not performed
if len(currentCandidate.reschedulablePods) != 0 {
return true
}
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
return true
}
disruptionBudgetMapping[candidate.nodePool.Name] -= 1
return false
})
return cmd
}

func (c *EmptyNodeConsolidation) Type() string {
return metrics.ConsolidationReason
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/controllers/disruption/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"sort"

"github.com/samber/lo"
"k8s.io/utils/clock"

"knative.dev/pkg/logging"
Expand Down Expand Up @@ -125,6 +126,27 @@ func (e *Expiration) ComputeCommand(ctx context.Context, disruptionBudgetMapping
return Command{}, scheduling.Results{}, nil
}

func (e *Expiration) ValidateCommand(ctx context.Context, cmd Command, disruptionBudgetMapping map[string]int, candidates ...*Candidate) Command {
cmd.candidates = lo.Reject(cmd.candidates, func(candidate *Candidate, _ int) bool {
currentCandidate, ok := lo.Find(candidates, func(c *Candidate) bool {
return c.Name() == candidate.Name()
})
if !ok {
return true
}
// Filter out any empty candidates which are no longer empty since a scheduling simulation was not performed
if len(candidate.reschedulablePods) == 0 && len(currentCandidate.reschedulablePods) != 0 {
return true
}
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
return true
}
disruptionBudgetMapping[candidate.nodePool.Name] -= 1
return false
})
return cmd
}

func (e *Expiration) Type() string {
return metrics.ExpirationReason
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return cmd, results, nil
}

func (m *MultiNodeConsolidation) ValidateCommand(ctx context.Context, cmd Command, disruptionBudgetMapping map[string]int, candidates ...*Candidate) Command {
candidates = mapCandidates(cmd.candidates, candidates)
if len(candidates) != len(cmd.candidates) {
return Command{}
}
for _, candidate := range cmd.candidates {
if disruptionBudgetMapping[candidate.nodePool.Name] == 0 {
return Command{}
}
disruptionBudgetMapping[candidate.nodePool.Name] -= 1
}
return cmd
}

// firstNConsolidationOption looks at the first N NodeClaims to determine if they can all be consolidated at once. The
// NodeClaims are sorted by increasing disruption order which correlates to likelihood of being able to consolidate the node
func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context, candidates []*Candidate, max int) (Command, scheduling.Results, error) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
return Command{}, scheduling.Results{}, nil
}

func (s *SingleNodeConsolidation) ValidateCommand(ctx context.Context, cmd Command, disruptionBudgetMapping map[string]int, candidates ...*Candidate) Command {
candidates = mapCandidates(cmd.candidates, candidates)
if len(candidates) != 1 {
return Command{}
}
if disruptionBudgetMapping[cmd.candidates[0].nodePool.Name] == 0 {
return Command{}
}
return cmd
}

func (s *SingleNodeConsolidation) Type() string {
return metrics.ConsolidationReason
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
type Method interface {
ShouldDisrupt(context.Context, *Candidate) bool
ComputeCommand(context.Context, map[string]int, ...*Candidate) (Command, scheduling.Results, error)
ValidateCommand(context.Context, Command, map[string]int, ...*Candidate) Command
Type() string
ConsolidationType() string
}
Expand Down

0 comments on commit e4d93cc

Please sign in to comment.