Skip to content

Commit

Permalink
race fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Apr 23, 2024
1 parent 780cd73 commit 259c3eb
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
4 changes: 3 additions & 1 deletion pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3750,7 +3750,7 @@ var _ = Describe("Consolidation", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodes[0])
})
It("should not consolidate if the action becomes invalid during the node TTL wait", func() {
FIt("should not consolidate if the action becomes invalid during the node TTL wait", func() {
pod := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1beta1.DoNotDisruptAnnotationKey: "true",
Expand Down Expand Up @@ -3788,6 +3788,8 @@ var _ = Describe("Consolidation", func() {
Eventually(finished.Load, 10*time.Second).Should(BeTrue())
wg.Wait()

ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

// nothing should be removed since the node is no longer empty
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
candidatesToDelete := mapCandidates(cmd.candidates, validationCandidates)
if invalidatedCount := len(cmd.candidates) - len(candidatesToDelete); invalidatedCount != 0 {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt, %d candidates are no longer valid, command is no longer valid, %s", invalidatedCount, cmd)
return Command{}, scheduling.Results{}, nil
}

postValidationMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
if err != nil {
Expand Down
66 changes: 41 additions & 25 deletions pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *sta
}
}

//nolint:gocyclo
func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
var err error
v.once.Do(func() {
Expand All @@ -80,37 +79,54 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
case <-v.clock.After(waitDuration):
}
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
// We perform filtering here to ensure that none of the proposed candidates have blocking PDBs or do-not-evict/do-not-disrupt pods scheduled to them
validationCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
validatedCandidates, err := v.validateCandidates(ctx, cmd.candidates...)
if len(validatedCandidates) == 0 || err != nil {
return false, err
}
isValid, err := v.ValidateCommand(ctx, cmd, validatedCandidates)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
return false, fmt.Errorf("validating command, %w", err)
}
validationCandidates = mapCandidates(cmd.candidates, validationCandidates)
// If we filtered out any candidates, return false as some NodeClaims in the consolidation decision have changed.
if len(validationCandidates) != len(cmd.candidates) {
return false, nil
// Revalidate candidates after validating the command. This mitigates the chance of a race condition outlined in
// the following GitHub issue: https://github.com/kubernetes-sigs/karpenter/issues/1167.
validatedCandidates, err = v.validateCandidates(ctx, validatedCandidates...)
if len(validatedCandidates) == 0 || err != nil {
return false, err
}
// Rebuild the disruption budget mapping to see if any budgets have changed since validation.
postValidationMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
return isValid, nil
}

// validateCandidates gets the current representation of the provided candidates and ensures that they are all still valid.
// For a candidate to still be valid, the following conditions must be met:
// a. It must pass the global candidate filtering logic (no blocking PDBs, no do-not-disrupt annotation, etc)
// b. It must not have any pods nominated for it
// c. It must still be disruptible without violating node disruption budgets
// If these conditions are met for all candidates, validateCandidates returns a slice with the updated representations.
// Otherwise, validateCandidates returns an empty slice.
func (v *Validation) validateCandidates(ctx context.Context, candidates ...*Candidate) ([]*Candidate, error) {
validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
if err != nil {
return false, fmt.Errorf("building disruption budgets, %w", err)
}
// 1. a candidate we are about to delete is a target of a currently pending pod, wait for that to settle
// before continuing consolidation
// 2. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range validationCandidates {
if v.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
return false, nil
}
postValidationMapping[n.nodePool.Name]--
return nil, fmt.Errorf("constructing validation candidates, %w", err)
}
validatedCandidates = mapCandidates(candidates, validatedCandidates)
// If we filtered out any candidates, return nil as some NodeClaims in the consolidation decision have changed.
if len(validatedCandidates) != len(candidates) {
return nil, nil
}
isValid, err := v.ValidateCommand(ctx, cmd, validationCandidates)
budgetMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
if err != nil {
return false, fmt.Errorf("validating command, %w", err)
return nil, fmt.Errorf("building disruption budgets, %w", err)
}
return isValid, nil
// Return nil if any candidate meets either of the following conditions:
// a. A pod was nominated to the candidate
// b. Disrupting the candidate would violate node disruption budgets
for _, vc := range validatedCandidates {
if v.cluster.IsNodeNominated(vc.ProviderID()) || budgetMapping[vc.nodePool.Name] == 0 {
return nil, nil
}
budgetMapping[vc.nodePool.Name]--
}
return validatedCandidates, nil
}

// ShouldDisrupt is a predicate used to filter candidates
Expand Down
4 changes: 3 additions & 1 deletion pkg/utils/pod/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func IsReschedulable(pod *v1.Pod) bool {
// - Is an active pod (isn't terminal or actively terminating)
// - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint
// - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/)
// - Doesn't have the "karpenter.sh/do-not-disrupt=true" annotation
func IsEvictable(pod *v1.Pod) bool {
return IsActive(pod) &&
!ToleratesDisruptionNoScheduleTaint(pod) &&
!IsOwnedByNode(pod)
!IsOwnedByNode(pod) &&
IsDisruptable(pod)
}

// IsWaitingEviction checks if this is a pod that we are waiting to be removed from the node by ensuring that the pod:
Expand Down

0 comments on commit 259c3eb

Please sign in to comment.