Skip to content

Commit

Permalink
Interruptible node affinity and anti-affinity (flyteorg#199)
Browse files Browse the repository at this point in the history
* Use node affinity for adding node selector requirements for interruptible and non-interruptible tasks

Signed-off-by: Jeev B <[email protected]>

* refactor

Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb authored Aug 19, 2021
1 parent 8b37310 commit a2fa41f
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 0 deletions.
5 changes: 5 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ type K8sPluginConfig struct {
InterruptibleTolerations []v1.Toleration `json:"interruptible-tolerations" pflag:"-,Tolerations to be applied for interruptible pods"`
// Node Selector Labels for interruptible pods: Similar to InterruptibleTolerations, these node selector labels are added for pods that can tolerate
// eviction.
// Deprecated: Please use InterruptibleNodeSelectorRequirement/NonInterruptibleNodeSelectorRequirement
InterruptibleNodeSelector map[string]string `json:"interruptible-node-selector" pflag:"-,Defines a set of node selector labels to add to the interruptible pods."`
// Node Selector Requirements to be added to interruptible and non-interruptible
// pods respectively
InterruptibleNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"interruptible-node-selector-requirement" pflag:"-,Node selector requirement to add to interruptible pods"`
NonInterruptibleNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"non-interruptible-node-selector-requirement" pflag:"-,Node selector requirement to add to non-interruptible pods"`

// ----------------------------------------------------------------------
// Specific tolerations that are added for certain resources. Useful for maintaining gpu resources separate in the cluster
Expand Down
36 changes: 36 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,41 @@ const OOMKilled = "OOMKilled"
const Interrupted = "Interrupted"
const SIGKILL = 137

func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec) {
// Determine node selector terms to add to node affinity
var nodeSelectorRequirement v1.NodeSelectorRequirement
if interruptible {
if config.GetK8sPluginConfig().InterruptibleNodeSelectorRequirement == nil {
return
}
nodeSelectorRequirement = *config.GetK8sPluginConfig().InterruptibleNodeSelectorRequirement
} else {
if config.GetK8sPluginConfig().NonInterruptibleNodeSelectorRequirement == nil {
return
}
nodeSelectorRequirement = *config.GetK8sPluginConfig().NonInterruptibleNodeSelectorRequirement
}

if podSpec.Affinity == nil {
podSpec.Affinity = &v1.Affinity{}
}
if podSpec.Affinity.NodeAffinity == nil {
podSpec.Affinity.NodeAffinity = &v1.NodeAffinity{}
}
if podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{}
}
if len(podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 {
nodeSelectorTerms := podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
for i := range nodeSelectorTerms {
nst := &nodeSelectorTerms[i]
nst.MatchExpressions = append(nst.MatchExpressions, nodeSelectorRequirement)
}
} else {
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{nodeSelectorRequirement}}}
}
}

// Updates the base pod spec used to execute tasks. This is configured with plugins and task metadata-specific options
func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata,
resourceRequirements []v1.ResourceRequirements, podSpec *v1.PodSpec) {
Expand All @@ -44,6 +79,7 @@ func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata,
if podSpec.Affinity == nil {
podSpec.Affinity = config.GetK8sPluginConfig().DefaultAffinity
}
ApplyInterruptibleNodeAffinity(taskExecutionMetadata.IsInterruptible(), podSpec)
}

func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, error) {
Expand Down
115 changes: 115 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,95 @@ func TestPodSetup(t *testing.T) {
err := configAccessor.UpdateConfig(context.TODO())
assert.NoError(t, err)

t.Run("ApplyInterruptibleNodeAffinity", TestApplyInterruptibleNodeAffinity)
t.Run("UpdatePod", updatePod)
t.Run("ToK8sPodInterruptible", toK8sPodInterruptible)
}

func TestApplyInterruptibleNodeAffinity(t *testing.T) {
t.Run("WithInterruptibleNodeSelectorRequirement", func(t *testing.T) {
podSpec := v1.PodSpec{}
ApplyInterruptibleNodeAffinity(true, &podSpec)
assert.EqualValues(
t,
[]v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "x/interruptible",
Operator: v1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms,
)
})

t.Run("WithNonInterruptibleNodeSelectorRequirement", func(t *testing.T) {
podSpec := v1.PodSpec{}
ApplyInterruptibleNodeAffinity(false, &podSpec)
assert.EqualValues(
t,
[]v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "x/interruptible",
Operator: v1.NodeSelectorOpDoesNotExist,
},
},
},
},
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms,
)
})

t.Run("WithExistingAffinityWithInterruptibleNodeSelectorRequirement", func(t *testing.T) {
podSpec := v1.PodSpec{
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "node selector requirement",
Operator: v1.NodeSelectorOpIn,
Values: []string{"exists"},
},
},
},
},
},
},
},
}
ApplyInterruptibleNodeAffinity(true, &podSpec)
assert.EqualValues(
t,
[]v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "node selector requirement",
Operator: v1.NodeSelectorOpIn,
Values: []string{"exists"},
},
v1.NodeSelectorRequirement{
Key: "x/interruptible",
Operator: v1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms,
)
})
}

func updatePod(t *testing.T) {
taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{
Limits: v1.ResourceList{
Expand Down Expand Up @@ -150,6 +235,21 @@ func updatePod(t *testing.T) {
"x/interruptible": "true",
"user": "also configured",
}, pod.Spec.NodeSelector)
assert.EqualValues(
t,
[]v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "x/interruptible",
Operator: v1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms,
)
}

func toK8sPodInterruptible(t *testing.T) {
Expand All @@ -174,6 +274,21 @@ func toK8sPodInterruptible(t *testing.T) {
assert.Equal(t, "interruptible", p.Tolerations[1].Value)
assert.Equal(t, 1, len(p.NodeSelector))
assert.Equal(t, "true", p.NodeSelector["x/interruptible"])
assert.EqualValues(
t,
[]v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "x/interruptible",
Operator: v1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
p.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms,
)
}

func TestToK8sPod(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ plugins:
value: interruptible
operator: Equal
effect: NoSchedule
interruptible-node-selector-requirement:
key: x/interruptible
operator: In
values:
- "true"
non-interruptible-node-selector-requirement:
key: x/interruptible
operator: DoesNotExist
default-env-vars:
- AWS_METADATA_SERVICE_TIMEOUT: 5
- AWS_METADATA_SERVICE_NUM_ATTEMPTS: 20
Expand Down

0 comments on commit a2fa41f

Please sign in to comment.