Skip to content

Commit

Permalink
Merge pull request #2149 from flyhighzy/preempt-stable-time
Browse files Browse the repository at this point in the history
add cooldown protection plugin
  • Loading branch information
volcano-sh-bot authored Jul 15, 2022
2 parents cb5bffc + 0d0c061 commit 9def57e
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 7 deletions.
201 changes: 201 additions & 0 deletions docs/user-guide/how_to_use_cdp_plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Cooldown Protection Plugin User Guide

## Background
When we need to enable elastic training or serving, preemptible job's pods can be preempted or back to running repeatedly, if no cooldown protection set, these pods can be preempted again after they just started for a short time, this may cause service stability dropped.
So we add "cdp" plugin to ensure preemptible job's pods can run for at least some time set by user.

## Environment setup

### Install volcano

Refer to [Install Guide](../../installer/README.md) to install volcano.

### Update scheduler configmap

After installed, update the scheduler configuration:

```shell
kubectl edit configmap -n volcano-system volcano-scheduler-configmap
```

Register `cdp` plugin in configmap while enable `preempt` action

```yaml
kind: ConfigMap
apiVersion: v1
metadata:
name: volcano-scheduler-configmap
namespace: volcano-system
data:
volcano-scheduler.conf: |
actions: "enqueue, allocate, preempt, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- name: cdp
- plugins:
- name: drf
- name: predicates
- name: task-topology
arguments:
task-topology.weight: 10
- name: proportion
- name: nodeorder
- name: binpack
```
### Running Jobs
Take a simple volcano job as sample.
original job yaml is as below, which has "ps" and "worker" task
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job
spec:
minAvailable: 3
schedulerName: volcano
priorityClassName: high-priority
plugins:
ssh: []
env: []
svc: []
maxRetry: 5
queue: default
volumes:
- mountPath: "/myinput"
- mountPath: "/myoutput"
volumeClaimName: "testvolumeclaimname"
volumeClaim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
resources:
requests:
storage: 1Gi
tasks:
- replicas: 6
name: "worker"
template:
metadata:
name: worker
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure
- replicas: 2
name: "ps"
template:
metadata:
name: ps
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure

```

#### Edit yaml of vcjob

1. add annotations in volcano job in format below.
1. `volcano.sh/preemptable` annotation indicates that job or task is preemptable
2. `volcano.sh/cooldown-time` annotation indicates cooldown time for the entire job or dedicated task. Value for the annotation indicates cooldown time, valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".

```yaml
volcano.sh/preemptable: "true"
volcano.sh/cooldown-time: "600s"
```
**Example 1**
Add annotation to entire job, then "ps" and "worker" task can be preempted and all have cooldown time support.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job
annotations:
volcano.sh/preemptable: "true"
volcano.sh/cooldown-time: "600s"
spec:
... # below keep the same
```
**Example 2**
Add annotation to dedicated task, as shown below, only "worker" can be preempted and have cooldown time support.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job
spec:
minAvailable: 3
schedulerName: volcano
priorityClassName: high-priority
plugins:
ssh: []
env: []
svc: []
maxRetry: 5
queue: default
volumes:
- mountPath: "/myinput"
- mountPath: "/myoutput"
volumeClaimName: "testvolumeclaimname"
volumeClaim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
resources:
requests:
storage: 1Gi
tasks:
- replicas: 6
name: "worker"
template:
metadata:
name: worker
annotations: # add annotation in tasks
volcano.sh/preemptable: "true"
volcano.sh/cooldown-time: "600s"
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure
- replicas: 2
name: "ps"
template:
metadata:
name: ps
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure

```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/yaml v1.3.0
stathat.com/c/consistent v1.0.0
volcano.sh/apis v0.0.0-20220705062437-edd428c7d2fd
volcano.sh/apis v1.6.0-alpha.0.0.20220712043845-8d8aa5aecbd2
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1204,5 +1204,5 @@ sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
volcano.sh/apis v0.0.0-20220705062437-edd428c7d2fd h1:b48P7vnI1FHyw2ZaAwtgH8D2vCNyxWJxFU15PqSFmn8=
volcano.sh/apis v0.0.0-20220705062437-edd428c7d2fd/go.mod h1:drNMGuHPn1ew7oBSDQb5KRey6tXOQksbUtw3gPxF3Vo=
volcano.sh/apis v1.6.0-alpha.0.0.20220712043845-8d8aa5aecbd2 h1:8p4FIUbVepYoyxMKxnb6W8PohzweIrIh06YvCHklq78=
volcano.sh/apis v1.6.0-alpha.0.0.20220712043845-8d8aa5aecbd2/go.mod h1:drNMGuHPn1ew7oBSDQb5KRey6tXOQksbUtw3gPxF3Vo=
6 changes: 6 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy b
if value, found := job.Annotations[schedulingv2.PodPreemptable]; found {
pod.Annotations[schedulingv2.PodPreemptable] = value
}
if value, found := job.Annotations[schedulingv2.CooldownTime]; found {
pod.Annotations[schedulingv2.CooldownTime] = value
}
if value, found := job.Annotations[schedulingv2.RevocableZone]; found {
pod.Annotations[schedulingv2.RevocableZone] = value
}
Expand All @@ -139,6 +142,9 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy b
if value, found := job.Labels[schedulingv2.PodPreemptable]; found {
pod.Labels[schedulingv2.PodPreemptable] = value
}
if value, found := job.Labels[schedulingv2.CooldownTime]; found {
pod.Labels[schedulingv2.CooldownTime] = value
}
}

if jobForwarding {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/podgroup/pg_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,18 @@ func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {
if value, ok := pod.Annotations[scheduling.PodPreemptable]; ok {
obj.Annotations[scheduling.PodPreemptable] = value
}
if value, ok := pod.Annotations[scheduling.CooldownTime]; ok {
obj.Annotations[scheduling.CooldownTime] = value
}
if value, ok := pod.Annotations[scheduling.RevocableZone]; ok {
obj.Annotations[scheduling.RevocableZone] = value
}
if value, ok := pod.Labels[scheduling.PodPreemptable]; ok {
obj.Labels[scheduling.PodPreemptable] = value
}
if value, ok := pod.Labels[scheduling.CooldownTime]; ok {
obj.Labels[scheduling.CooldownTime] = value
}

if value, found := pod.Annotations[scheduling.JDBMinAvailable]; found {
obj.Annotations[scheduling.JDBMinAvailable] = value
Expand Down
112 changes: 112 additions & 0 deletions pkg/scheduler/plugins/cdp/cdp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2022 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cdp

import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"

"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/util"
)

const (
// refer to issue https://github.com/volcano-sh/volcano/issues/2075,
// plugin cdp means cooldown protection, related to elastic scheduler,
// when we need to enable elastic training or serving,
// preemptible job's pods can be preempted or back to running repeatedly,
// if no cooldown protection set, these pods can be preempted again after they just started for a short time,
// this may cause service stability dropped.
// cdp plugin here is to ensure vcjob's pods cannot be preempted within cooldown protection conditions.
// currently cdp plugin only support cooldown time protection.
PluginName = "cdp"
)

type CooldownProtectionPlugin struct {
}

// New return CooldownProtectionPlugin
func New(arguments framework.Arguments) framework.Plugin {
return &CooldownProtectionPlugin{}
}

// Name implements framework.Plugin
func (*CooldownProtectionPlugin) Name() string {
return PluginName
}

func (sp *CooldownProtectionPlugin) podCooldownTime(pod *v1.Pod) (value time.Duration, enabled bool) {
// check labels and annotations
v, ok := pod.Labels[v1beta1.CooldownTime]
if !ok {
v, ok = pod.Annotations[v1beta1.CooldownTime]
if !ok {
return 0, false
}
}
vi, err := time.ParseDuration(v)
if err != nil {
klog.Warningf("invalid time duration %s=%s", v1beta1.CooldownTime, v)
return 0, false
}
return vi, true
}

// OnSessionOpen implements framework.Plugin
func (sp *CooldownProtectionPlugin) OnSessionOpen(ssn *framework.Session) {
preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
var victims []*api.TaskInfo
for _, preemptee := range preemptees {
cooldownTime, enabled := sp.podCooldownTime(preemptee.Pod)
if !enabled {
victims = append(victims, preemptee)
continue
}
pod := preemptee.Pod
// find the time of pod really transform to running
// only running pod check stable time, others all put into victims
stableFiltered := false
if pod.Status.Phase == v1.PodRunning {
// ensure pod is running and have ready state
for _, c := range pod.Status.Conditions {
if c.Type == v1.PodScheduled && c.Status == v1.ConditionTrue {
if c.LastTransitionTime.Add(cooldownTime).After(time.Now()) {
stableFiltered = true
}
break
}
}
}
if !stableFiltered {
victims = append(victims, preemptee)
}
}

klog.V(4).Infof("Victims from cdp plugins are %+v", victims)
return victims, util.Permit
}

klog.V(4).Info("plugin cdp session open")
ssn.AddPreemptableFn(sp.Name(), preemptableFn)
}

// OnSessionClose implements framework.Plugin
func (*CooldownProtectionPlugin) OnSessionClose(ssn *framework.Session) {}
Loading

0 comments on commit 9def57e

Please sign in to comment.