diff --git a/.github/workflows/code_verify.yaml b/.github/workflows/code_verify.yaml
index 8afb103c91d..f8db533780f 100644
--- a/.github/workflows/code_verify.yaml
+++ b/.github/workflows/code_verify.yaml
@@ -18,7 +18,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Checkout code
uses: actions/checkout@v2
diff --git a/.github/workflows/e2e_parallel_jobs.yaml b/.github/workflows/e2e_parallel_jobs.yaml
index 341ae68062c..994ef1ddf72 100644
--- a/.github/workflows/e2e_parallel_jobs.yaml
+++ b/.github/workflows/e2e_parallel_jobs.yaml
@@ -16,7 +16,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Install musl
run: |
@@ -31,7 +31,7 @@ jobs:
- name: Install dependences
run: |
- GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.0
+ GO111MODULE="on" go install sigs.k8s.io/kind@v0.11.0
curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.23.0/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl
- name: Checkout code
uses: actions/checkout@v2
diff --git a/.github/workflows/e2e_scheduling_actions.yaml b/.github/workflows/e2e_scheduling_actions.yaml
index e7a4082ee98..13f35afb0cb 100644
--- a/.github/workflows/e2e_scheduling_actions.yaml
+++ b/.github/workflows/e2e_scheduling_actions.yaml
@@ -16,7 +16,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Install musl
run: |
@@ -31,7 +31,7 @@ jobs:
- name: Install dependences
run: |
- GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.0
+ GO111MODULE="on" go install sigs.k8s.io/kind@v0.11.0
curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.23.0/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl
- name: Checkout code
uses: actions/checkout@v2
diff --git a/.github/workflows/e2e_scheduling_basic.yaml b/.github/workflows/e2e_scheduling_basic.yaml
index e5f77e9a033..c96f8d30ee4 100644
--- a/.github/workflows/e2e_scheduling_basic.yaml
+++ b/.github/workflows/e2e_scheduling_basic.yaml
@@ -16,7 +16,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Install musl
run: |
@@ -31,7 +31,7 @@ jobs:
- name: Install dependences
run: |
- GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.0
+ GO111MODULE="on" go install sigs.k8s.io/kind@v0.11.0
curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.23.0/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl
- name: Checkout code
uses: actions/checkout@v2
diff --git a/.github/workflows/e2e_sequence.yaml b/.github/workflows/e2e_sequence.yaml
index bcf058c8165..7c07756d892 100644
--- a/.github/workflows/e2e_sequence.yaml
+++ b/.github/workflows/e2e_sequence.yaml
@@ -16,7 +16,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Install musl
run: |
@@ -31,7 +31,7 @@ jobs:
- name: Install dependences
run: |
- GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.0
+ GO111MODULE="on" go install sigs.k8s.io/kind@v0.11.0
curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.23.0/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl
- name: Checkout code
uses: actions/checkout@v2
diff --git a/.github/workflows/e2e_spark.yaml b/.github/workflows/e2e_spark.yaml
index 45628d728b2..9d3e06f95fe 100644
--- a/.github/workflows/e2e_spark.yaml
+++ b/.github/workflows/e2e_spark.yaml
@@ -31,7 +31,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: start minikube
run: |
# Use pre-install minikube
diff --git a/.github/workflows/e2e_vcctl.yaml b/.github/workflows/e2e_vcctl.yaml
index fbe83e656ed..ab989087566 100644
--- a/.github/workflows/e2e_vcctl.yaml
+++ b/.github/workflows/e2e_vcctl.yaml
@@ -16,7 +16,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Install musl
run: |
@@ -31,7 +31,7 @@ jobs:
- name: Install dependences
run: |
- GO111MODULE="on" go get sigs.k8s.io/kind@v0.11.0
+ GO111MODULE="on" go install sigs.k8s.io/kind@v0.11.0
curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.23.0/bin/linux/amd64/kubectl && sudo install kubectl /usr/local/bin/kubectl
- name: Checkout code
uses: actions/checkout@v2
diff --git a/.github/workflows/fossa.yml b/.github/workflows/fossa.yml
index e296cbb5dbd..55614095aae 100644
--- a/.github/workflows/fossa.yml
+++ b/.github/workflows/fossa.yml
@@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
- go-version: "^1.17.x"
+ go-version: "^1.18.x"
- run: go version
# Runs a set of commands to initialize and analyze with FOSSA
- name: run FOSSA analysis
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index e82c29ab263..85a3c1f2ccf 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -13,7 +13,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
- go-version: 1.17.x
+ go-version: 1.18.x
- name: Install musl
run: |
diff --git a/README.md b/README.md
index c2030212e1f..3b0ab94d139 100644
--- a/README.md
+++ b/README.md
@@ -135,7 +135,8 @@ Community weekly meeting for Asia: 15:00 - 16:00 (UTC+8) Friday. ([Convert to yo
Community biweekly meeting for America: 08:30 - 09:30 (UTC-8) Thursday. ([Convert to your timezone.](https://www.thetimezoneconverter.com/?t=10%3A00&tz=GMT%2B8&))
-Community biweekly meeting for Europe: 11:00 - 12:00 (UTC+1) Thursday. ([Convert to your timezone.](https://www.thetimezoneconverter.com/?t=10%3A00&tz=GMT%2B8&))
+Community meeting for Europe is ongoing on demand now. If you have some ideas or topics to discuss, please leave message
+in the [slack](https://cloud-native.slack.com/archives/C011GJDQS0N). Maintainers will contact with you and book an open meeting for that.
Resources:
- [Meeting notes and agenda](https://docs.google.com/document/d/1YLbF8zjZBiR9PbXQPB22iuc_L0Oui5A1lddVfRnZrqs/edit)
diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go
index b95453f7b07..4720c62e643 100644
--- a/cmd/scheduler/app/options/options.go
+++ b/cmd/scheduler/app/options/options.go
@@ -45,7 +45,7 @@ const (
// ServerOption is the main context object for the controller manager.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
- SchedulerName string
+ SchedulerNames []string
SchedulerConf string
SchedulePeriod time.Duration
EnableLeaderElection bool
@@ -83,7 +83,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.KubeClientOptions.Master, "master", s.KubeClientOptions.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.KubeClientOptions.KubeConfig, "kubeconfig", s.KubeClientOptions.KubeConfig, "Path to kubeconfig file with authorization and master location information")
// volcano scheduler will ignore pods with scheduler names other than specified with the option
- fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "vc-scheduler will handle pods whose .spec.SchedulerName is same as scheduler-name")
+ fs.StringArrayVar(&s.SchedulerNames, "scheduler-name", []string{defaultSchedulerName}, "vc-scheduler will handle pods whose .spec.SchedulerName is same as scheduler-name")
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle")
fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job")
diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go
index 5f8684050be..16c2a2d99c1 100644
--- a/cmd/scheduler/app/options/options_test.go
+++ b/cmd/scheduler/app/options/options_test.go
@@ -39,7 +39,7 @@ func TestAddFlags(t *testing.T) {
// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
- SchedulerName: defaultSchedulerName,
+ SchedulerNames: []string{defaultSchedulerName},
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
diff --git a/cmd/scheduler/app/server.go b/cmd/scheduler/app/server.go
index 052b503ae08..746a9e16b9a 100644
--- a/cmd/scheduler/app/server.go
+++ b/cmd/scheduler/app/server.go
@@ -31,6 +31,7 @@ import (
"volcano.sh/volcano/pkg/scheduler"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/signals"
+ commonutil "volcano.sh/volcano/pkg/util"
"volcano.sh/volcano/pkg/version"
v1 "k8s.io/api/core/v1"
@@ -74,7 +75,7 @@ func Run(opt *options.ServerOption) error {
}
sched, err := scheduler.NewScheduler(config,
- opt.SchedulerName,
+ opt.SchedulerNames,
opt.SchedulerConf,
opt.SchedulePeriod,
opt.DefaultQueue,
@@ -115,7 +116,7 @@ func Run(opt *options.ServerOption) error {
// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
- eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: opt.SchedulerName})
+ eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(opt.SchedulerNames)})
hostname, err := os.Hostname()
if err != nil {
@@ -126,7 +127,7 @@ func Run(opt *options.ServerOption) error {
rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
opt.LockObjectNamespace,
- opt.SchedulerName,
+ commonutil.GenerateComponentName(opt.SchedulerNames),
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
diff --git a/cmd/webhook-manager/app/options/options.go b/cmd/webhook-manager/app/options/options.go
index fd07b65522b..b0ece916283 100644
--- a/cmd/webhook-manager/app/options/options.go
+++ b/cmd/webhook-manager/app/options/options.go
@@ -47,7 +47,7 @@ type Config struct {
PrintVersion bool
WebhookName string
WebhookNamespace string
- SchedulerName string
+ SchedulerNames []string
WebhookURL string
ConfigPath string
EnabledAdmission string
@@ -79,8 +79,8 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.WebhookNamespace, "webhook-namespace", "", "The namespace of this webhook")
fs.StringVar(&c.WebhookName, "webhook-service-name", "", "The name of this webhook")
fs.StringVar(&c.WebhookURL, "webhook-url", "", "The url of this webhook")
- fs.StringVar(&c.EnabledAdmission, "enabled-admission", defaultEnabledAdmission, "enabled admission webhooks")
- fs.StringVar(&c.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
+ fs.StringVar(&c.EnabledAdmission, "enabled-admission", defaultEnabledAdmission, "enabled admission webhooks, if this parameter is modified, make sure corresponding webhook configurations are the same.")
+ fs.StringArrayVar(&c.SchedulerNames, "scheduler-name", []string{defaultSchedulerName}, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
fs.StringVar(&c.ConfigPath, "admission-conf", "", "The configmap file of this webhook")
fs.StringVar(&c.IgnoredNamespaces, "ignored-namespaces", defaultIgnoredNamespaces, "Comma-separated list of namespaces to be ignored by admission webhooks")
}
diff --git a/cmd/webhook-manager/app/server.go b/cmd/webhook-manager/app/server.go
index 80bc191dbf0..7527e1fdc40 100644
--- a/cmd/webhook-manager/app/server.go
+++ b/cmd/webhook-manager/app/server.go
@@ -32,6 +32,7 @@ import (
"volcano.sh/apis/pkg/apis/scheduling/scheme"
"volcano.sh/volcano/cmd/webhook-manager/app/options"
"volcano.sh/volcano/pkg/kube"
+ commonutil "volcano.sh/volcano/pkg/util"
"volcano.sh/volcano/pkg/version"
wkconfig "volcano.sh/volcano/pkg/webhooks/config"
"volcano.sh/volcano/pkg/webhooks/router"
@@ -65,23 +66,25 @@ func Run(config *options.Config) error {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
- recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.SchedulerName})
+ recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(config.SchedulerNames)})
router.ForEachAdmission(config, func(service *router.AdmissionService) {
if service.Config != nil {
service.Config.VolcanoClient = vClient
service.Config.KubeClient = kubeClient
- service.Config.SchedulerName = config.SchedulerName
+ service.Config.SchedulerNames = config.SchedulerNames
service.Config.Recorder = recorder
service.Config.ConfigData = admissionConf
}
klog.V(3).Infof("Registered '%s' as webhook.", service.Path)
http.HandleFunc(service.Path, service.Handler)
-
- klog.V(3).Infof("Registered configuration for webhook <%s>", service.Path)
- registerWebhookConfig(kubeClient, config, service, config.CaCertData)
})
+ if err = addCaCertForWebhook(kubeClient, config.CaCertData); err != nil {
+ return fmt.Errorf("failed to add caCert for webhook %v", err)
+ }
+ klog.V(3).Infof("Successfully added caCert for all webhooks")
+
webhookServeError := make(chan struct{})
stopChannel := make(chan os.Signal, 1)
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)
diff --git a/cmd/webhook-manager/app/util.go b/cmd/webhook-manager/app/util.go
index 237a969a933..a8361d3558f 100644
--- a/cmd/webhook-manager/app/util.go
+++ b/cmd/webhook-manager/app/util.go
@@ -17,93 +17,103 @@ limitations under the License.
package app
import (
+ "bytes"
"context"
"crypto/tls"
"crypto/x509"
- "regexp"
- "strings"
+ "fmt"
+ "time"
v1 "k8s.io/api/admissionregistration/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/cmd/webhook-manager/app/options"
- "volcano.sh/volcano/pkg/webhooks/router"
)
-func registerWebhookConfig(kubeClient *kubernetes.Clientset, config *options.Config, service *router.AdmissionService, caBundle []byte) {
- sideEffect := v1.SideEffectClassNoneOnDryRun
- reviewVersions := []string{"v1"}
- webhookLabelSelector := &metav1.LabelSelector{}
- clientConfig := v1.WebhookClientConfig{
- CABundle: caBundle,
+var (
+ validatingWebhooksName = []string{
+ "volcano-admission-service-jobs-validate",
+ "volcano-admission-service-pods-validate",
+ "volcano-admission-service-queues-validate",
}
- if config.WebhookURL != "" {
- url := config.WebhookURL + service.Path
- clientConfig.URL = &url
- klog.Infof("The URL of webhook manager is <%s>.", url)
+ mutatingWebhooksName = []string{
+ "volcano-admission-service-pods-mutate",
+ "volcano-admission-service-queues-mutate",
+ "volcano-admission-service-podgroups-mutate",
+ "volcano-admission-service-jobs-mutate",
}
- if config.WebhookName != "" && config.WebhookNamespace != "" {
- clientConfig.Service = &v1.ServiceReference{
- Name: config.WebhookName,
- Namespace: config.WebhookNamespace,
- Path: &service.Path,
- }
- klog.Infof("The service of webhook manager is <%s/%s/%s>.",
- config.WebhookName, config.WebhookNamespace, service.Path)
- }
- if config.IgnoredNamespaces != "" {
- ignoredNamespaces := strings.Split(strings.TrimSpace(config.IgnoredNamespaces), ",")
- klog.Infof("The ignored namespaces list of webhook manager is <%v>.",
- ignoredNamespaces)
- webhookLabelSelector = &metav1.LabelSelector{
- MatchExpressions: []metav1.LabelSelectorRequirement{
- {
- Values: ignoredNamespaces,
- Operator: "NotIn",
- Key: "kubernetes.io/metadata.name",
- },
- },
- }
- }
- if service.MutatingConfig != nil {
- for i := range service.MutatingConfig.Webhooks {
- service.MutatingConfig.Webhooks[i].SideEffects = &sideEffect
- service.MutatingConfig.Webhooks[i].AdmissionReviewVersions = reviewVersions
- service.MutatingConfig.Webhooks[i].ClientConfig = clientConfig
- service.MutatingConfig.Webhooks[i].NamespaceSelector = webhookLabelSelector
- }
+)
- service.MutatingConfig.ObjectMeta.Name = webhookConfigName(config.WebhookName, service.Path)
+func addCaCertForWebhook(kubeClient *kubernetes.Clientset, caBundle []byte) error {
+ for _, mutatingWebhookName := range mutatingWebhooksName {
+ var mutatingWebhook *v1.MutatingWebhookConfiguration
+ webhookChanged := false
+ if err := wait.Poll(time.Second, 5*time.Minute, func() (done bool, err error) {
+ mutatingWebhook, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), mutatingWebhookName, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ klog.Errorln(err)
+ return false, nil
+ }
+ return false, fmt.Errorf("failed to get mutating webhook %v", err)
+ }
+ return true, nil
+ }); err != nil {
+ return fmt.Errorf("failed to get mutating webhook %v", err)
+ }
- if err := registerMutateWebhook(kubeClient, service.MutatingConfig); err != nil {
- klog.Errorf("Failed to register mutating admission webhook (%s): %v",
- service.Path, err)
- } else {
- klog.V(3).Infof("Registered mutating webhook for path <%s>.", service.Path)
+ for index := 0; index < len(mutatingWebhook.Webhooks); index++ {
+ if mutatingWebhook.Webhooks[index].ClientConfig.CABundle == nil ||
+ !bytes.Equal(mutatingWebhook.Webhooks[index].ClientConfig.CABundle, caBundle) {
+ mutatingWebhook.Webhooks[index].ClientConfig.CABundle = caBundle
+ webhookChanged = true
+ }
}
- }
- if service.ValidatingConfig != nil {
- for i := range service.ValidatingConfig.Webhooks {
- service.ValidatingConfig.Webhooks[i].SideEffects = &sideEffect
- service.ValidatingConfig.Webhooks[i].AdmissionReviewVersions = reviewVersions
- service.ValidatingConfig.Webhooks[i].ClientConfig = clientConfig
- service.ValidatingConfig.Webhooks[i].NamespaceSelector = webhookLabelSelector
+ if webhookChanged {
+ if _, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(context.TODO(), mutatingWebhook, metav1.UpdateOptions{}); err != nil {
+ return fmt.Errorf("failed to update mutating admission webhooks %v %v", mutatingWebhookName, err)
+ }
}
+ }
- service.ValidatingConfig.ObjectMeta.Name = webhookConfigName(config.WebhookName, service.Path)
+ for _, validatingWebhookName := range validatingWebhooksName {
+ var validatingWebhook *v1.ValidatingWebhookConfiguration
+ webhookChanged := false
+ if err := wait.Poll(time.Second, 5*time.Minute, func() (done bool, err error) {
+ validatingWebhook, err = kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(context.TODO(), validatingWebhookName, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ klog.Errorln(err)
+ return false, nil
+ }
+ return false, fmt.Errorf("failed to get validating webhook %v", err)
+ }
+ return true, nil
+ }); err != nil {
+ return fmt.Errorf("failed to get validating webhook %v", err)
+ }
- if err := registerValidateWebhook(kubeClient, service.ValidatingConfig); err != nil {
- klog.Errorf("Failed to register validating admission webhook (%s): %v",
- service.Path, err)
- } else {
- klog.V(3).Infof("Registered validating webhook for path <%s>.", service.Path)
+ for index := 0; index < len(validatingWebhook.Webhooks); index++ {
+ if validatingWebhook.Webhooks[index].ClientConfig.CABundle == nil ||
+ !bytes.Equal(validatingWebhook.Webhooks[index].ClientConfig.CABundle, caBundle) {
+ validatingWebhook.Webhooks[index].ClientConfig.CABundle = caBundle
+ webhookChanged = true
+ }
+ }
+ if webhookChanged {
+ if _, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Update(context.TODO(), validatingWebhook, metav1.UpdateOptions{}); err != nil {
+ return fmt.Errorf("failed to update validating admission webhooks %v %v", validatingWebhookName, err)
+ }
}
}
+
+ return nil
}
// getKubeClient Get a clientset with restConfig.
@@ -164,58 +174,3 @@ func configTLS(config *options.Config, restConfig *rest.Config) *tls.Config {
klog.Fatal("tls: failed to find any tls config data")
return &tls.Config{}
}
-
-func registerMutateWebhook(clientset *kubernetes.Clientset, hook *v1.MutatingWebhookConfiguration) error {
- client := clientset.AdmissionregistrationV1().MutatingWebhookConfigurations()
- existing, err := client.Get(context.TODO(), hook.Name, metav1.GetOptions{})
- if err != nil && !apierrors.IsNotFound(err) {
- return err
- }
- if err == nil && existing != nil {
- klog.V(4).Infof("Updating MutatingWebhookConfiguration %v", hook)
- existing.Webhooks = hook.Webhooks
- if _, err := client.Update(context.TODO(), existing, metav1.UpdateOptions{}); err != nil {
- return err
- }
- } else {
- klog.V(4).Infof("Creating MutatingWebhookConfiguration %v", hook)
- if _, err := client.Create(context.TODO(), hook, metav1.CreateOptions{}); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func registerValidateWebhook(clientset *kubernetes.Clientset, hook *v1.ValidatingWebhookConfiguration) error {
- client := clientset.AdmissionregistrationV1().ValidatingWebhookConfigurations()
-
- existing, err := client.Get(context.TODO(), hook.Name, metav1.GetOptions{})
- if err != nil && !apierrors.IsNotFound(err) {
- return err
- }
- if err == nil && existing != nil {
- existing.Webhooks = hook.Webhooks
- klog.V(4).Infof("Updating ValidatingWebhookConfiguration %v", hook)
- if _, err := client.Update(context.TODO(), existing, metav1.UpdateOptions{}); err != nil {
- return err
- }
- } else {
- klog.V(4).Infof("Creating ValidatingWebhookConfiguration %v", hook)
- if _, err := client.Create(context.TODO(), hook, metav1.CreateOptions{}); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func webhookConfigName(name, path string) string {
- if name == "" {
- name = "webhook"
- }
-
- re := regexp.MustCompile(`-+`)
- raw := strings.Join([]string{name, strings.ReplaceAll(path, "/", "-")}, "-")
- return re.ReplaceAllString(raw, "-")
-}
diff --git a/docs/design/distributed-framework-plugins.md b/docs/design/distributed-framework-plugins.md
index c38b9dc775b..fa7ed0ca056 100644
--- a/docs/design/distributed-framework-plugins.md
+++ b/docs/design/distributed-framework-plugins.md
@@ -61,7 +61,7 @@ With the introduction of distributed pattern in various frameworks, we can imple
The key implementation of tensorflow plugin is that how to set correct `TF_CONFIG` environment variable for every pod.
-Firstly, we must known the cluster role of task in volcano job, and the port to be exposed. And this information can be passed by plugin arguments, which is defined in job spec.
+Firstly, we must know the cluster role of task in volcano job, and the port to be exposed. And this information can be passed by plugin arguments, which is defined in job spec.
```yaml
spec:
@@ -70,8 +70,6 @@ spec:
tensorflow: ["--port=5000", "--worker=worker", "--ps=ps"]
```
-
-
In the implementation of `tensorflowPlugin`, these arguments will be parsed.
```go
@@ -99,8 +97,6 @@ func (tp *tensorflowPlugin) addFlags() {
}
```
-
-
And then patch the pod spec in method `OnPodCreate`.
```go
@@ -129,8 +125,6 @@ func (tp *tensorflowPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
}
```
-
-
Here is the structure of `TF_CONFIG`:
```go
@@ -199,6 +193,37 @@ func (tp *tensorflowPlugin) getClusterInfo(job *batch.Job) clusterInfo {
}
```
+#### Pytorch Plugin
+
+Similar to the tensorflow plugin, firstly we must know the cluster role of task in volcano job, and the port to be exposed. And this information can be passed by plugin arguments, which is defined in job spec.
+
+```yaml
+spec:
+ plugins:
+ # set pytorch plugin
+ pytorch: ["--master=master","--worker=worker","--port=23456"]
+```
+
+In the implementation of `pytorchPlugin`, these arguments will be parsed.
+
+```go
+// pytorchPlugin is plugin for pytorch framework
+type pytorchPlugin struct {
+ pytorchArguments []string
+ clientset pluginsinterface.PluginClientset
+ masterName string
+ workerName string
+ port int
+}
+```
+
+Then we patch pytorch-distributed-training related environment variables to container envs in method `OnPodCreate`.
+The main environment variables are:
+* `MASTER_ADDR`: master address
+* `MASTER_PORT`: master port
+* `WORLD_SIZE`: total node number
+* `RANK`: current node index
+
#### Other Framework
Most of other frameworks is similar to Tensorflow. But the MPI framework is special. In most case, It needs a `hostfile`, e.g. :
diff --git a/docs/design/images/jobflow-1.png b/docs/design/images/jobflow-1.png
new file mode 100644
index 00000000000..94bd0f6db87
Binary files /dev/null and b/docs/design/images/jobflow-1.png differ
diff --git a/docs/design/images/jobflow-2.jpg b/docs/design/images/jobflow-2.jpg
new file mode 100644
index 00000000000..ac988dfa672
Binary files /dev/null and b/docs/design/images/jobflow-2.jpg differ
diff --git a/docs/design/images/jobflow-3.png b/docs/design/images/jobflow-3.png
new file mode 100644
index 00000000000..9aa3362d223
Binary files /dev/null and b/docs/design/images/jobflow-3.png differ
diff --git a/docs/design/images/jobflow-4.png b/docs/design/images/jobflow-4.png
new file mode 100644
index 00000000000..a94e2a7a6a8
Binary files /dev/null and b/docs/design/images/jobflow-4.png differ
diff --git a/docs/design/images/jobflow.gif b/docs/design/images/jobflow.gif
new file mode 100644
index 00000000000..0dfd77ba753
Binary files /dev/null and b/docs/design/images/jobflow.gif differ
diff --git a/docs/design/jobflow/README.md b/docs/design/jobflow/README.md
new file mode 100755
index 00000000000..6eef4b18767
--- /dev/null
+++ b/docs/design/jobflow/README.md
@@ -0,0 +1,368 @@
+# JobFlow
+
+## Introduction
+
+In order to solve the problem of inter-job dependencies. We need many VCJobs to cooperate each other and orchestrate them manually or by another Job Orchestration Platform to get the job done finally.We present an new way of orchestrating VCJobs called JobFlow. We proposed two concepts to running multiple batch jobs automatically named JobTemplate and JobFlow so end users can easily declare their jobs and run them using complex controlling primitives, for example, sequential or parallel executing, if-then-else statement, switch-case statement, loop executing and so on.
+
+JobFlow helps migrating AI, BigData, HPC workloads to the cloud-native world. Though there are already some workload flow engines, they are not designed for batch job workloads. Those jobs typically have a complex running dependencies and take long time to run, for example days or weeks. JobFlow helps the end users to declare their jobs as an jobTemplate and then reuse them accordingly. Also, JobFlow orchestrating those jobs using complex controlling primitives and launch those jobs automatically. This can significantly reduce the time consumption of an complex job and improve resource utilization. Finally, JobFlow is not an generally purposed workflow engine, it knows the details of VCJobs. End user can have a better understanding of their jobs, for example, job's running state, beginning and ending timestamps, the next jobs to run, pod-failure-ratio and so on.
+
+## Scope
+
+### In Scope
+- Define the API of JobFlow
+- Define the behaviour of JobFlow
+- Start sequence between multiple jobs
+- Dependency completion state of the job start sequence
+- DAG-based job dependency startup
+
+### Out of Scope
+- Supports other job
+- Achieve vcjobs level gang
+
+## Scenarios
+
+- Some jobs need to depend on the completion of the previous job or other status when running, etc. Otherwise, the correct result cannot be calculated.
+- Sometimes inter-job dependencies also require diverse dependency types, such as conditional dependencies, circular dependencies, probes, and so on.
+
+
+
+## Design
+
+
+
+
+
+The blue part is the component of k8s itself, the green and brown are the components of volcano, and the yellow is the crd resource of volcano.
+
+jobflow job submission complete process:
+
+1. After passing the Admission. kubectl will create JobTemplate and JobFlow (Volcano CRD) objects in kube-apiserver.
+
+2. The JobFlowController uses the JobTemplate as a template according to the configuration of the JobFlow, and creates the corresponding VcJob according to the flow dependency rules.
+
+3. After VcJob is created, VcJobController creates corresponding Pods and podgroups according to the configuration of VcJob.
+
+4. After Pod and PodGroup are created, vc-scheduler will go to kube-apiserver to get Pod/PodGroup and node information.
+
+5. After obtaining the information, vc-scheduler will select the appropriate node for each Pod according to its configured scheduling policy.
+
+6. After assigning nodes to Pods, kubelet will get the Pod's configuration from kube-apiserver and start the corresponding containers.
+
+
+
+### Controller
+
+
+
+### Webhook
+
+```
+Create a JobFlow check
+1、There cannot be a template with the same name in a JobFlow dependency
+ Such as: A->B->A->C A appears twice
+2、Closed loops cannot occur in JobFlow
+ E.g:A -> B -> C
+ ^ /
+ | /
+ < - D
+
+Create a JobTemplte check (following the vcjob parameter specification)
+E.g: job minAvailable must be greater than or equal to zero
+ job maxRetry must be greater than or equal to zero
+ tasks cannot be empty, and cannot have tasks with the same name
+ The number of task replicas cannot be less than zero
+ task minAvailable cannot be greater than task replicas...
+```
+
+### JobFlow
+
+#### Introduction
+
+JobFlow defines the running flow of a set of jobs. Fields in JobFlow define how jobs are orchestrated.
+
+JobFlow is abbreviated as jf, and the resource can be viewed through kubectl get jf
+
+JobFlow aims to realize job-dependent operation between vcjobs in volcano. According to the dependency between vcjob, vcjob is issued.
+
+#### Key Fields
+
+##### Top-Level Attributes
+
+The top-level attributes of a jobflow define its apiVersion, kind, metadata and spec.
+
+| Attribute | Type | Required | Default Value | Description |
+| ------------ | ----------------------- | -------- | -------------------------- | ------------------------------------------------------------ |
+| `apiVersion` | `string` | Y | `flow.volcano.sh/v1alpha1` | A string that identifies the version of the schema the object should have. The core types uses `flow.volcano.sh/v1alpha1` in this version of documentation. |
+| `kind` | `string` | Y | `JobFlow` | Must be `JobFlow` |
+| `metadata` | [`Metadata`](#Metadata) | Y | | Information about the JobFlow resource. |
+| `spec` | [`Spec`](#spec) | Y | | A specification for the JobFlow resource attributes. |
+| `status` | [`Status`](#Status) | Y | | A specification for the JobFlow status attributes. |
+
+
+
+##### Metadata
+
+Metadata provides basic information about the JobFlow.
+
+| Attribute | Type | Required | Default Value | Description |
+| ------------- | ------------------- | -------- | ------------- | ------------------------------------------------------------ |
+| `name` | `string` | Y | | A name for the schematic. `name` is subject to the restrictions listed beneath this table. |
+| `namespace` | `string` | Y | | A namespace for the schematic. `namespace` is subject to the restrictions listed beneath this table. |
+| `labels` | `map[string]string` | N | | A set of string key/value pairs used as arbitrary labels on this component. Labels follow the [Kubernetes specification](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/). |
+| `annotations` | `map[string]string` | N | | A set of string key/value pairs used as arbitrary descriptive text associated with this object. Annotations follows the [Kubernetes specification](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set). |
+
+
+
+##### Spec
+
+The specification of cloud-native services defines service metadata, version list, service capabilities and plugins.
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `flows` | [`Flow array`](#Flow) | Y | | Describes the dependencies between vcjobs. |
+| `jobRetainPolicy` | `string` | Y | retain | After JobFlow succeed, keep the generated job. Otherwise, delete it. |
+
+
+
+##### Flow
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `name` | `string` | Y | | JobTemplate name |
+| `dependsOn` | [`DependsOn`](#DependsOn) | Y | | JobTemplate dependencies |
+| `patch` | [`Patch`](#Patch) | N | | Patch JobTemplate |
+
+
+
+##### DependsOn
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `targets` | `string array` | Y | | All jobtemplate names that JobTemplate depends on |
+| `probe` | [`Probe`](#Probe) | N | | Probe Type Dependency |
+| `strategy` | `string` | Y | all | Whether the dependencies need to be all satisfied |
+
+
+
+##### Patch
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `spec` | `spec` | Y | | Patch the contents of the jobtemplate's spec |
+
+
+
+##### Probe
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `httpGetList` | [`HttpGet array`](#HttpGet) | N | | HttpGet type dependencies |
+| `tcpSocketList` | [`TcpSocket array`](#TcpSocket) | N | | TcpSocket type dependencies |
+| `taskStatusList` | [`TaskStatus array`](#TaskStatus) | N | | TaskStatus type dependencies |
+
+
+
+##### HttpGet
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `TaskName` | `string` | Y | | The name of the task under vcjob |
+| `Path` | [`Probe`](#Probe) | Y | | The path of httpget |
+| `Port` | `int` | Y | | The port of httpget |
+| `httpHeader` | `HTTPHeader` | N | | The httpHeader of httpget |
+
+
+
+##### TcpSocket
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `TaskName` | `string` | Y | | The name of the task under vcjob |
+| `Port` | `int` | Y | | The port of TcpSocket |
+
+
+
+##### TaskStatus
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `TaskName` | `string` | Y | | The name of the task under vcjob |
+| `Phase` | `string` | Y | | The phase of task |
+
+
+
+##### Status
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `pendingJobs` | `string array` | N | | Vcjobs in pending state |
+| `runningJobs` | `string array` | N | | Vcjobs in running state |
+| `failedJobs` | `string array` | N | | Vcjobs in failed state |
+| `completedJobs` | `string array` | N | | Vcjobs in completed and completing state |
+| `terminatedJobs` | `string array` | N | | Vcjobs in terminated and terminating state |
+| `unKnowJobs` | `string array` | N | | Vcjobs in pending state |
+| `jobStatusList` | [`JobStatus array`](#JobStatus) | N | | Status information of all split vcjobs |
+| `conditions` | [`map[string]Condition`](#Condition) | N | | It is used to describe the current state, creation time, completion time and information of all vcjobs. The vcjob state here additionally adds the waiting state to describe the vcjob whose dependencies do not meet the requirements. |
+| `state` | [`State`](#State) | N | | State of JobFlow |
+
+
+
+##### JobStatus
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `name` | `string` | N | | Name of vcjob |
+| `state` | `string` | N | | State of vcJob |
+| `startTimestamp` | `Time` | N | | StartTimestamp of vcjob |
+| `endTimestamp` | `Time` | N | | EndTimestamp of vcjob |
+| `restartCount` | `int32` | N | | RestartCount of vcjob |
+| `runningHistories` | [`JobRunningHistory array`](#JobRunningHistory) | N | | Historical information of various states of vcjob |
+
+
+
+##### Condition
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `phase` | `string` | N | | phase of vcjob |
+| `createTime` | `Time` | N | | CreateTime of vcjob |
+| `runningDuration` | `Duration` | N | | RunningDuration of vcjob |
+| `taskStatusCount` | `map[string]TaskState` | N | | The number of tasks in different states |
+
+
+
+##### State
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `phase` | `string` | N | | Succeed: All vcjobs have reached completed state。
Terminating: Jobflow is deleting。
Failed: A vcjob in the flow is in the failed state, so the vcjob in the flow cannot continue to be delivered。
Running: Flow contains vcjob in Running state。
Pending: The flow contains no vcjob in the Running state。 |
+
+
+
+##### JobRunningHistory
+
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `startTimestamp` | `Time` | N | | The start time of a certain state of the vcjob |
+| `endTimestamp` | `Time` | N | | The end time of a certain state of the vcjob |
+| `state` | `string` | N | | Vcjob status |
+
+JobFlow supports the functionality of the JobTemplate patch. The example in JobFlow is as follows:
+
+```
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobFlow
+metadata:
+ name: test
+ namespace: default
+spec:
+ jobRetainPolicy: delete
+ flows:
+ - name: a
+ patch:
+ spec:
+ tasks:
+ - name: "default-nginx"
+ template:
+ spec:
+ containers:
+ - name: nginx
+ command:
+ - sh
+ - -c
+ - sleep 10s
+```
+
+Here is an example of jobflow:
+
+[the sample file of JobFlow](../../../example/jobflow/JobFlow.yaml)
+
+### JobTemplate
+
+#### Introduction
+
+* JobTemplate is the template of vcjob, after JobTemplate is created, it will not be processed by vc-controller like vcjob, it will wait to be referenced by JobFlow.
+* JobFlow can reference multiple jobtemplates
+* A jobtemplate can be referenced by multiple jobflows
+* JobTemplate can be converted to and from vcjob.
+* Jobtemplate is abbreviated as jt, and the resource can be viewed through kubectl get jt
+* The difference between jobtemplate and vcjob is that jobtemplate will not be issued by the job controller, and jobflow can directly reference the name of the JobTemplate to implement the issuance of vcjob.
+* JobFlow supports making changes to jobtemplate when referencing jobtemplate
+
+#### Definition
+
+#### Key Fields
+
+##### Top-Level Attributes
+
+The top-level attributes of a jobtemplate define its apiVersion, kind, metadata and spec.
+
+| Attribute | Type | Required | Default Value | Description |
+| ------------ | ----------------------- | -------- | -------------------------- | ------------------------------------------------------------ |
+| `apiVersion` | `string` | Y | `flow.volcano.sh/v1alpha1` | A string that identifies the version of the schema the object should have. The core types uses `flow.volcano.sh/v1alpha1` in this version of documentation. |
+| `kind` | `string` | Y | `JobTemplate` | Must be `JobTemplate` |
+| `metadata` | [`Metadata`](#JobTemplateMetadata) | Y | | Information about the JobTemplate resource. |
+| `spec` | [`Spec`](#JobTemplateSpec) | Y | | A specification for the JobTemplate resource attributes. |
+| `status` | [`Status`](# JobTemplateStatus) | Y | | A specification for the JobTemplate status attributes. |
+
+
+
+##### Metadata
+
+Metadata provides basic information about the JobTemplate.
+
+| Attribute | Type | Required | Default Value | Description |
+| ------------- | ------------------- | -------- | ------------- | ------------------------------------------------------------ |
+| `name` | `string` | Y | | A name for the schematic. `name` is subject to the restrictions listed beneath this table. |
+| `namespace` | `string` | Y | | A namespace for the schematic. `namespace` is subject to the restrictions listed beneath this table. |
+| `labels` | `map[string]string` | N | | A set of string key/value pairs used as arbitrary labels on this component. Labels follow the [Kubernetes specification](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/). |
+| `annotations` | `map[string]string` | N | | A set of string key/value pairs used as arbitrary descriptive text associated with this object. Annotations follows the [Kubernetes specification](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set). |
+
+
+
+##### JobTemplateSpec
+
+The spec of jobtemplate directly follows the spec of vcjob.
+
+
+
+##### JobTemplateStatus
+| Attribute | Type | Required | Default Value | Description |
+| ----------------- | ------------------------------------ | -------- | ------------- | ------------------------------------------------------------ |
+| `jobDependsOnList` | `string array` | Y | | Vcjobs created by this jobtemplate as a template. |
+
+You can view [the sample file of JobTemplate](../../../example/jobflow/JobTemplate.yaml)
+
+## JobFlow task scheduling
+
+
+
+## Demo video
+
+https://www.bilibili.com/video/BV1c44y1Y7FX
+
+## Usage
+
+- Create the jobTemplate that needs to be used
+- Create a jobflow. The flow field of the jobflow is filled with the corresponding jobtemplate used to create a vcjob.
+- The field jobRetainPolicy indicates whether to delete the vcjob created by the jobflow after the jobflow succeeds. (delete/retain) default is retain.
+
+## JobFlow Features
+
+### Features that have been implemented
+
+* Create JobFlow and JobTemplate CRD
+* Support sequential start of vcjob
+* Support vcjob to depend on other vcjobs to start
+* Support the conversion of vcjob and JobTemplate to each other
+* Supports viewing of the running status of JobFlow
+
+### Features not yet implemented
+
+* JobFlow supports making changes to jobtemplate when referencing jobtemplate
+* `if` statements
+* `switch` statements
+* `for` statements
+* Support job failure retry in JobFlow
+* Integration with volcano-scheduler
+* Support for scheduling plugins at JobFlow level
\ No newline at end of file
diff --git a/docs/user-guide/how_to_use_pytorch_plugin.md b/docs/user-guide/how_to_use_pytorch_plugin.md
new file mode 100644
index 00000000000..6d1c196dd6e
--- /dev/null
+++ b/docs/user-guide/how_to_use_pytorch_plugin.md
@@ -0,0 +1,60 @@
+# Pytorch Plugin User Guide
+
+## Introduction
+
+**Pytorch plugin** is designed to optimize the user experience when running pytorch jobs, it not only allows users to write less yaml, but also ensures the normal operation of Pytorch jobs.
+
+## How the Pytorch Plugin Works
+
+The Pytorch Plugin will do three things:
+
+* Open ports used by Pytorch for all containers of the job
+* Force open `svc` plugins
+* Add some envs such like `MASTER_ADDR`, `MASTER_PORT`, `WORLD_SIZE`, `RANK` which pytorch distributed training needed to containers automatically
+
+## Parameters of the Pytorch Plugin
+
+### Arguments
+
+| ID | Name | Type | Default Value | Required | Description | Example |
+| ---- | ------ | ------ | ------------- | -------- | ---------------------------------- | ------------------ |
+| 1 | master | string | master | No | Name of Pytorch master | --master=master |
+| 2 | worker | string | worker | No | Name of Pytorch worker | --worker=worker |
+| 3 | port | string | 23456 | No | The port to open for the container | --port=23456 |
+
+## Examples
+
+```yaml
+apiVersion: batch.volcano.sh/v1alpha1
+kind: Job
+metadata:
+ name: pytorch-job
+spec:
+ minAvailable: 1
+ schedulerName: volcano
+ plugins:
+ pytorch: ["--master=master","--worker=worker","--port=23456"] # Pytorch plugin register
+ tasks:
+ - replicas: 1
+ name: master
+ policies:
+ - event: TaskCompleted
+ action: CompleteJob
+ template:
+ spec:
+ containers:
+ - image: gcr.io/kubeflow-ci/pytorch-dist-sendrecv-test:1.0
+ imagePullPolicy: IfNotPresent
+ name: master
+ restartPolicy: OnFailure
+ - replicas: 2
+ name: worker
+ template:
+ spec:
+ containers:
+ - image: gcr.io/kubeflow-ci/pytorch-dist-sendrecv-test:1.0
+ imagePullPolicy: IfNotPresent
+ name: worker
+ workingDir: /home
+ restartPolicy: OnFailure
+```
\ No newline at end of file
diff --git a/example/jobflow/JobFlow.yaml b/example/jobflow/JobFlow.yaml
new file mode 100755
index 00000000000..66e79850567
--- /dev/null
+++ b/example/jobflow/JobFlow.yaml
@@ -0,0 +1,22 @@
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobFlow
+metadata:
+ name: test
+ namespace: default
+spec:
+ jobRetainPolicy: delete # After jobflow runs, keep the generated job. Otherwise, delete it.
+ flows:
+ - name: a
+ - name: b
+ dependsOn:
+ targets: ['a']
+ - name: c
+ dependsOn:
+ targets: ['b']
+ - name: d
+ dependsOn:
+ targets: ['b']
+ - name: e
+ dependsOn:
+ targets: ['c','d']
+
\ No newline at end of file
diff --git a/example/jobflow/JobTemplate.yaml b/example/jobflow/JobTemplate.yaml
new file mode 100755
index 00000000000..01e5150985d
--- /dev/null
+++ b/example/jobflow/JobTemplate.yaml
@@ -0,0 +1,184 @@
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobTemplate
+metadata:
+ name: a
+spec:
+ minAvailable: 1
+ schedulerName: volcano
+ priorityClassName: high-priority
+ policies:
+ - event: PodEvicted
+ action: RestartJob
+ plugins:
+ ssh: []
+ env: []
+ svc: []
+ maxRetry: 5
+ queue: default
+ tasks:
+ - replicas: 1
+ name: "default-nginx"
+ template:
+ metadata:
+ name: web
+ spec:
+ containers:
+ - image: nginx:1.14.2
+ command:
+ - sh
+ - -c
+ - sleep 10s
+ imagePullPolicy: IfNotPresent
+ name: nginx
+ resources:
+ requests:
+ cpu: "1"
+ restartPolicy: OnFailure
+---
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobTemplate
+metadata:
+ name: b
+spec:
+ minAvailable: 1
+ schedulerName: volcano
+ priorityClassName: high-priority
+ policies:
+ - event: PodEvicted
+ action: RestartJob
+ plugins:
+ ssh: []
+ env: []
+ svc: []
+ maxRetry: 5
+ queue: default
+ tasks:
+ - replicas: 1
+ name: "default-nginx"
+ template:
+ metadata:
+ name: web
+ spec:
+ containers:
+ - image: nginx:1.14.2
+ command:
+ - sh
+ - -c
+ - sleep 10s
+ imagePullPolicy: IfNotPresent
+ name: nginx
+ resources:
+ requests:
+ cpu: "1"
+ restartPolicy: OnFailure
+---
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobTemplate
+metadata:
+ name: c
+spec:
+ minAvailable: 1
+ schedulerName: volcano
+ priorityClassName: high-priority
+ policies:
+ - event: PodEvicted
+ action: RestartJob
+ plugins:
+ ssh: []
+ env: []
+ svc: []
+ maxRetry: 5
+ queue: default
+ tasks:
+ - replicas: 1
+ name: "default-nginx"
+ template:
+ metadata:
+ name: web
+ spec:
+ containers:
+ - image: nginx:1.14.2
+ command:
+ - sh
+ - -c
+ - sleep 10s
+ imagePullPolicy: IfNotPresent
+ name: nginx
+ resources:
+ requests:
+ cpu: "1"
+ restartPolicy: OnFailure
+---
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobTemplate
+metadata:
+ name: d
+spec:
+ minAvailable: 1
+ schedulerName: volcano
+ priorityClassName: high-priority
+ policies:
+ - event: PodEvicted
+ action: RestartJob
+ plugins:
+ ssh: []
+ env: []
+ svc: []
+ maxRetry: 5
+ queue: default
+ tasks:
+ - replicas: 1
+ name: "default-nginx"
+ template:
+ metadata:
+ name: web
+ spec:
+ containers:
+ - image: nginx:1.14.2
+ command:
+ - sh
+ - -c
+ - sleep 10s
+ imagePullPolicy: IfNotPresent
+ name: nginx
+ resources:
+ requests:
+ cpu: "1"
+ restartPolicy: OnFailure
+---
+apiVersion: flow.volcano.sh/v1alpha1
+kind: JobTemplate
+metadata:
+ name: e
+spec:
+ minAvailable: 1
+ schedulerName: volcano
+ priorityClassName: high-priority
+ policies:
+ - event: PodEvicted
+ action: RestartJob
+ plugins:
+ ssh: []
+ env: []
+ svc: []
+ maxRetry: 5
+ queue: default
+ tasks:
+ - replicas: 1
+ name: "default-nginx"
+ template:
+ metadata:
+ name: web
+ spec:
+ containers:
+ - image: nginx:1.14.2
+ command:
+ - sh
+ - -c
+ - sleep 10s
+ imagePullPolicy: IfNotPresent
+ name: nginx
+ resources:
+ requests:
+ cpu: "1"
+ restartPolicy: OnFailure
\ No newline at end of file
diff --git a/example/jobflow/README.md b/example/jobflow/README.md
new file mode 100644
index 00000000000..a262c2910bd
--- /dev/null
+++ b/example/jobflow/README.md
@@ -0,0 +1,63 @@
+## JobFlow
+
+#### These examples shows how to run JobFlow via Volcano.
+
+[JobFlow](../../docs/design/jobflow) is a workflow engine based on volcano Job. It proposes two concepts to automate running multiple batch jobs, named JobTemplate and JobFlow, so end users can easily declare their jobs and run them using complex control primitives such as sequential or parallel execution, if-then -else statement, switch-case statement, loop execution, etc.
+
+read design at [here](../../docs/design/jobflow).
+
+### Prerequisites
+
+- docker: `18.06`
+- Kubernetes: >`1.17`
+
+## startup steps
+
+build image from local
+```bash
+# get volcano and jobflow source code from github
+git clone http://github.com/volcano-sh/volcano.git
+git clone https://github.com/BoCloud/JobFlow.git
+
+# build image beyondcent/jobflow:v0.0.1 from local
+cd JobFlow
+make
+make docker-build
+```
+
+##### deploy JobFlow from [here](https://github.com/BoCloud/JobFlow#deploy)
+```bash
+kubectl apply -f https://raw.githubusercontent.com/BoCloud/JobFlow/main/deploy/jobflow.yaml
+```
+
+##### deploy Volcano from [here](https://volcano.sh/en/docs/installation/#install-with-yaml-files)
+```bash
+kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml
+```
+
+if cert of `jobflow-webhook-service.kube-system.svc` has expired, generate one to replace it.
+```bash
+# delete expired cert in secrets
+kubectl delete secret jobflow-webhook-server-cert -nkube-system
+
+# use gen-admission-secret.sh register new secret
+cd volcano
+./installer/dockerfile/webhook-manager/gen-admission-secret.sh --service jobflow-webhook-service --namespace kube-system --secret jobflow-webhook-server-cert
+
+# restart jobflow-controller-manager
+kubectl delete pod/jobflow-controller-manager-67847d59dd-j8dmc -nkube-system
+```
+
+##### run jobflow example
+```bash
+# deploy jobTemplate first
+cd volcano
+kubectl apply -f example/jobflow/JobTemplate.yaml
+# deploy jobFlow second
+kubectl apply -f example/jobflow/JobFlow.yaml
+
+# check them
+kubectl get jt
+kubectl get jf
+kubectl get po
+```
\ No newline at end of file
diff --git a/go.mod b/go.mod
index d8ad9f2f463..ab010057bdb 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
module volcano.sh/volcano
-go 1.17
+go 1.18
require (
github.com/agiledragon/gomonkey/v2 v2.1.0
diff --git a/go.sum b/go.sum
index d4362f8cd18..cf0852c4320 100644
--- a/go.sum
+++ b/go.sum
@@ -105,7 +105,6 @@ github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx2
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
-github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@@ -143,7 +142,6 @@ github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHo
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
-github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
@@ -475,7 +473,6 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
-github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.0.0 h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
@@ -890,7 +887,6 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
diff --git a/hack/generate-yaml.sh b/hack/generate-yaml.sh
index b707865972a..e36ba7b55e6 100755
--- a/hack/generate-yaml.sh
+++ b/hack/generate-yaml.sh
@@ -118,6 +118,7 @@ ${HELM_BIN_DIR}/helm template ${VK_ROOT}/installer/helm/chart/volcano --namespac
-s templates/scheduling_v1beta1_podgroup.yaml \
-s templates/scheduling_v1beta1_queue.yaml \
-s templates/nodeinfo_v1alpha1_numatopologies.yaml \
+ -s templates/webhooks.yaml \
>> ${DEPLOYMENT_FILE}
${HELM_BIN_DIR}/helm template ${VK_ROOT}/installer/helm/chart/volcano --namespace volcano-monitoring \
diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml
index 5354fa06b41..f4e412c880e 100644
--- a/installer/helm/chart/volcano/templates/controllers.yaml
+++ b/installer/helm/chart/volcano/templates/controllers.yaml
@@ -28,7 +28,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods"]
- verbs: ["create", "get", "list", "watch", "update", "bind", "delete"]
+ verbs: ["create", "get", "list", "watch", "update", "bind", "delete", "patch"]
- apiGroups: [""]
resources: ["pods/finalizers"]
verbs: ["update", "patch"]
diff --git a/installer/helm/chart/volcano/templates/webhooks.yaml b/installer/helm/chart/volcano/templates/webhooks.yaml
new file mode 100644
index 00000000000..1a292115e1a
--- /dev/null
+++ b/installer/helm/chart/volcano/templates/webhooks.yaml
@@ -0,0 +1,294 @@
+{{- if .Values.custom.admission_enable }}
+
+{{- if .Values.custom.pods_mutatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-pods-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /pods/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatepod.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - ""
+ apiVersions:
+ - v1
+ operations:
+ - CREATE
+ resources:
+ - pods
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+
+---
+
+{{- if .Values.custom.queues_mutatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-queues-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /queues/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatequeue.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ resources:
+ - queues
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+
+---
+
+{{- if .Values.custom.podgroups_mutatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-podgroups-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /podgroups/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatepodgroup.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ resources:
+ - podgroups
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+
+---
+
+{{- if .Values.custom.jobs_mutatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-jobs-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /jobs/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatejob.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - batch.volcano.sh
+ apiVersions:
+ - v1alpha1
+ operations:
+ - CREATE
+ resources:
+ - jobs
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+
+---
+
+{{- if .Values.custom.jobs_validatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-jobs-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /jobs/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatejob.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - batch.volcano.sh
+ apiVersions:
+ - v1alpha1
+ operations:
+ - CREATE
+ - UPDATE
+ resources:
+ - jobs
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+
+---
+
+{{- if .Values.custom.pods_validatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-pods-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /pods/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatepod.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - ""
+ apiVersions:
+ - v1
+ operations:
+ - CREATE
+ resources:
+ - pods
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+
+---
+
+{{- if .Values.custom.queues_validatingwebhook_enable }}
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-queues-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: {{ .Release.Name }}-admission-service
+ namespace: {{ .Release.Namespace }}
+ path: /queues/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatequeue.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ - UPDATE
+ - DELETE
+ resources:
+ - queues
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+{{- end }}
+{{- end }}
\ No newline at end of file
diff --git a/installer/helm/chart/volcano/values.yaml b/installer/helm/chart/volcano/values.yaml
index dec749edf98..8bdb067a538 100644
--- a/installer/helm/chart/volcano/values.yaml
+++ b/installer/helm/chart/volcano/values.yaml
@@ -13,3 +13,10 @@ custom:
admission_enable: true
controller_enable: true
scheduler_enable: true
+ pods_mutatingwebhook_enable: true
+ queues_mutatingwebhook_enable: true
+ podgroups_mutatingwebhook_enable: true
+ jobs_mutatingwebhook_enable: true
+ jobs_validatingwebhook_enable: true
+ pods_validatingwebhook_enable: true
+ queues_validatingwebhook_enable: true
\ No newline at end of file
diff --git a/installer/volcano-development-arm64.yaml b/installer/volcano-development-arm64.yaml
index 2d916331eef..bd694457057 100644
--- a/installer/volcano-development-arm64.yaml
+++ b/installer/volcano-development-arm64.yaml
@@ -8433,7 +8433,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods"]
- verbs: ["create", "get", "list", "watch", "update", "bind", "delete"]
+ verbs: ["create", "get", "list", "watch", "update", "bind", "delete", "patch"]
- apiGroups: [""]
resources: ["pods/finalizers"]
verbs: ["update", "patch"]
@@ -9102,3 +9102,276 @@ status:
plural: ""
conditions: []
storedVersions: []
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-pods-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /pods/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatepod.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - ""
+ apiVersions:
+ - v1
+ operations:
+ - CREATE
+ resources:
+ - pods
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-queues-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /queues/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatequeue.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ resources:
+ - queues
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-podgroups-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /podgroups/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatepodgroup.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ resources:
+ - podgroups
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-jobs-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /jobs/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatejob.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - batch.volcano.sh
+ apiVersions:
+ - v1alpha1
+ operations:
+ - CREATE
+ resources:
+ - jobs
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-jobs-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /jobs/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatejob.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - batch.volcano.sh
+ apiVersions:
+ - v1alpha1
+ operations:
+ - CREATE
+ - UPDATE
+ resources:
+ - jobs
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-pods-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /pods/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatepod.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - ""
+ apiVersions:
+ - v1
+ operations:
+ - CREATE
+ resources:
+ - pods
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-queues-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /queues/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatequeue.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ - UPDATE
+ - DELETE
+ resources:
+ - queues
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml
index 94b7a385e67..aac32554967 100644
--- a/installer/volcano-development.yaml
+++ b/installer/volcano-development.yaml
@@ -8433,7 +8433,7 @@ rules:
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods"]
- verbs: ["create", "get", "list", "watch", "update", "bind", "delete"]
+ verbs: ["create", "get", "list", "watch", "update", "bind", "delete", "patch"]
- apiGroups: [""]
resources: ["pods/finalizers"]
verbs: ["update", "patch"]
@@ -9102,3 +9102,276 @@ status:
plural: ""
conditions: []
storedVersions: []
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-pods-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /pods/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatepod.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - ""
+ apiVersions:
+ - v1
+ operations:
+ - CREATE
+ resources:
+ - pods
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-queues-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /queues/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatequeue.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ resources:
+ - queues
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-podgroups-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /podgroups/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatepodgroup.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ resources:
+ - podgroups
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-jobs-mutate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /jobs/mutate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: mutatejob.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ reinvocationPolicy: Never
+ rules:
+ - apiGroups:
+ - batch.volcano.sh
+ apiVersions:
+ - v1alpha1
+ operations:
+ - CREATE
+ resources:
+ - jobs
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-jobs-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /jobs/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatejob.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - batch.volcano.sh
+ apiVersions:
+ - v1alpha1
+ operations:
+ - CREATE
+ - UPDATE
+ resources:
+ - jobs
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-pods-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /pods/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatepod.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - ""
+ apiVersions:
+ - v1
+ operations:
+ - CREATE
+ resources:
+ - pods
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
+---
+# Source: volcano/templates/webhooks.yaml
+apiVersion: admissionregistration.k8s.io/v1
+kind: ValidatingWebhookConfiguration
+metadata:
+ name: volcano-admission-service-queues-validate
+webhooks:
+ - admissionReviewVersions:
+ - v1
+ clientConfig:
+ service:
+ name: volcano-admission-service
+ namespace: volcano-system
+ path: /queues/validate
+ port: 443
+ failurePolicy: Fail
+ matchPolicy: Equivalent
+ name: validatequeue.volcano.sh
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: NotIn
+ values:
+ - volcano-system
+ - kube-system
+ objectSelector: {}
+ rules:
+ - apiGroups:
+ - scheduling.volcano.sh
+ apiVersions:
+ - v1beta1
+ operations:
+ - CREATE
+ - UPDATE
+ - DELETE
+ resources:
+ - queues
+ scope: '*'
+ sideEffects: NoneOnDryRun
+ timeoutSeconds: 10
diff --git a/pkg/controllers/OWNERS b/pkg/controllers/OWNERS
index 652367ca1f5..6b0d1e8c28c 100644
--- a/pkg/controllers/OWNERS
+++ b/pkg/controllers/OWNERS
@@ -1,7 +1,9 @@
reviewers:
- hzxuzhonghu
- TommyLike
+ - hwdef
approvers:
- hzxuzhonghu
- TommyLike
- shinytang6
+ - hwdef
diff --git a/pkg/controllers/cache/cache.go b/pkg/controllers/cache/cache.go
index 9e6d1fdab52..3c480f57059 100644
--- a/pkg/controllers/cache/cache.go
+++ b/pkg/controllers/cache/cache.go
@@ -154,17 +154,19 @@ func (jc *jobCache) Update(obj *v1alpha1.Job) error {
return fmt.Errorf("failed to find job <%v>", key)
}
- var oldResourceversion, newResourceversion uint64
- var err error
- if oldResourceversion, err = strconv.ParseUint(job.Job.ResourceVersion, 10, 64); err != nil {
- return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, job.Job.ResourceVersion)
- }
+ if job.Job != nil {
+ var oldResourceversion, newResourceversion uint64
+ var err error
+ if oldResourceversion, err = strconv.ParseUint(job.Job.ResourceVersion, 10, 64); err != nil {
+ return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, job.Job.ResourceVersion)
+ }
- if newResourceversion, err = strconv.ParseUint(obj.ResourceVersion, 10, 64); err != nil {
- return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, obj.ResourceVersion)
- }
- if newResourceversion < oldResourceversion {
- return fmt.Errorf("job <%v> has too old resource version: %d (%d)", key, newResourceversion, oldResourceversion)
+ if newResourceversion, err = strconv.ParseUint(obj.ResourceVersion, 10, 64); err != nil {
+ return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, obj.ResourceVersion)
+ }
+ if newResourceversion < oldResourceversion {
+ return fmt.Errorf("job <%v> has too old resource version: %d (%d)", key, newResourceversion, oldResourceversion)
+ }
}
job.Job = obj
return nil
diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go
index 2a24f2d3b69..a77cb043c69 100644
--- a/pkg/controllers/job/job_controller_actions.go
+++ b/pkg/controllers/job/job_controller_actions.go
@@ -457,12 +457,10 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
RetryCount: job.Status.RetryCount,
}
- if updateStatus != nil {
- if updateStatus(&job.Status) {
- job.Status.State.LastTransitionTime = metav1.Now()
- jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
- job.Status.Conditions = append(job.Status.Conditions, jobCondition)
- }
+ if updateStatus != nil && updateStatus(&job.Status) {
+ job.Status.State.LastTransitionTime = metav1.Now()
+ jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
+ job.Status.Conditions = append(job.Status.Conditions, jobCondition)
}
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
@@ -480,26 +478,28 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
}
func (cc *jobcontroller) waitDependsOnTaskMeetCondition(taskName string, taskIndex int, podToCreateEachTask []*v1.Pod, job *batch.Job) {
- if job.Spec.Tasks[taskIndex].DependsOn != nil {
- dependsOn := *job.Spec.Tasks[taskIndex].DependsOn
- if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny {
+ if job.Spec.Tasks[taskIndex].DependsOn == nil {
+ return
+ }
+
+ dependsOn := *job.Spec.Tasks[taskIndex].DependsOn
+ if len(dependsOn.Name) > 1 && dependsOn.Iteration == batch.IterationAny {
+ wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
+ for _, task := range dependsOn.Name {
+ if cc.isDependsOnPodsReady(task, job) {
+ return true, nil
+ }
+ }
+ return false, nil
+ })
+ } else {
+ for _, dependsOnTask := range dependsOn.Name {
wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
- for _, task := range dependsOn.Name {
- if cc.isDependsOnPodsReady(task, job) {
- return true, nil
- }
+ if cc.isDependsOnPodsReady(dependsOnTask, job) {
+ return true, nil
}
return false, nil
})
- } else {
- for _, dependsOnTask := range dependsOn.Name {
- wait.PollInfinite(detectionPeriodOfDependsOntask, func() (bool, error) {
- if cc.isDependsOnPodsReady(dependsOnTask, job) {
- return true, nil
- }
- return false, nil
- })
- }
}
}
}
@@ -511,6 +511,15 @@ func (cc *jobcontroller) isDependsOnPodsReady(task string, job *batch.Job) bool
for _, podName := range dependsOnPods {
pod, err := cc.podLister.Pods(job.Namespace).Get(podName)
if err != nil {
+ // If pod is not found. There are 2 possibilities.
+ // 1. vcjob has been deleted. This function should return true.
+ // 2. pod is not created. This function should return false, continue waiting.
+ if apierrors.IsNotFound(err) {
+ _, errGetJob := cc.jobLister.Jobs(job.Namespace).Get(job.Name)
+ if errGetJob != nil {
+ return apierrors.IsNotFound(errGetJob)
+ }
+ }
klog.Errorf("Failed to get pod %v/%v %v", job.Namespace, podName, err)
continue
}
@@ -632,51 +641,63 @@ func (cc *jobcontroller) createPVC(job *batch.Job, vcName string, volumeClaim *v
func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
pgName := job.Name + "-" + string(job.UID)
- pg, err := cc.pgLister.PodGroups(job.Namespace).Get(pgName)
+ var pg *scheduling.PodGroup
+ var err error
+ pg, err = cc.pgLister.PodGroups(job.Namespace).Get(pgName)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
- }
+ } else {
+ // try to get old pg if new pg not exist
+ pg, err = cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
+ if err != nil {
+ if !apierrors.IsNotFound(err) {
+ klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
+ job.Namespace, job.Name, err)
+ return err
+ }
- minTaskMember := map[string]int32{}
- for _, task := range job.Spec.Tasks {
- if task.MinAvailable != nil {
- minTaskMember[task.Name] = *task.MinAvailable
- } else {
- minTaskMember[task.Name] = task.Replicas
- }
- }
+ minTaskMember := map[string]int32{}
+ for _, task := range job.Spec.Tasks {
+ if task.MinAvailable != nil {
+ minTaskMember[task.Name] = *task.MinAvailable
+ } else {
+ minTaskMember[task.Name] = task.Replicas
+ }
+ }
- pg := &scheduling.PodGroup{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: job.Namespace,
- //add job.UID into its name when create new PodGroup
- Name: pgName,
- Annotations: job.Annotations,
- Labels: job.Labels,
- OwnerReferences: []metav1.OwnerReference{
- *metav1.NewControllerRef(job, helpers.JobKind),
- },
- },
- Spec: scheduling.PodGroupSpec{
- MinMember: job.Spec.MinAvailable,
- MinTaskMember: minTaskMember,
- Queue: job.Spec.Queue,
- MinResources: cc.calcPGMinResources(job),
- PriorityClassName: job.Spec.PriorityClassName,
- },
- }
+ pg := &scheduling.PodGroup{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: job.Namespace,
+ //add job.UID into its name when create new PodGroup
+ Name: pgName,
+ Annotations: job.Annotations,
+ Labels: job.Labels,
+ OwnerReferences: []metav1.OwnerReference{
+ *metav1.NewControllerRef(job, helpers.JobKind),
+ },
+ },
+ Spec: scheduling.PodGroupSpec{
+ MinMember: job.Spec.MinAvailable,
+ MinTaskMember: minTaskMember,
+ Queue: job.Spec.Queue,
+ MinResources: cc.calcPGMinResources(job),
+ PriorityClassName: job.Spec.PriorityClassName,
+ },
+ }
- if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
- if !apierrors.IsAlreadyExists(err) {
- klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
- job.Namespace, job.Name, err)
- return err
+ if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
+ if !apierrors.IsAlreadyExists(err) {
+ klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
+ job.Namespace, job.Name, err)
+ return err
+ }
+ }
+ return nil
}
}
- return nil
}
pgShouldUpdate := false
diff --git a/pkg/controllers/job/plugins/distributed-framework/pytorch/pytorch.go b/pkg/controllers/job/plugins/distributed-framework/pytorch/pytorch.go
new file mode 100644
index 00000000000..9b66112cf28
--- /dev/null
+++ b/pkg/controllers/job/plugins/distributed-framework/pytorch/pytorch.go
@@ -0,0 +1,180 @@
+package pytorch
+
+import (
+ "flag"
+ "fmt"
+ "strconv"
+
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/klog"
+
+ batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
+ "volcano.sh/volcano/pkg/controllers/job/helpers"
+ pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface"
+)
+
+const (
+ // PytorchPluginName is the name of the plugin
+ PytorchPluginName = "pytorch"
+ // DefaultPort is the default port for pytorch
+ DefaultPort = 23456
+ // DefaultMaster is the default task name of master host
+ DefaultMaster = "master"
+ // DefaultWorker is the default task name of worker host
+ DefaultWorker = "worker"
+
+ // EnvMasterPort is the env name of master port
+ EnvMasterPort = "MASTER_PORT"
+ // EnvMasterAddr is the env name of master addr
+ EnvMasterAddr = "MASTER_ADDR"
+ // EnvWorldSize is the env name of world size
+ EnvWorldSize = "WORLD_SIZE"
+ // EnvRank is the env name of rank
+ EnvRank = "RANK"
+)
+
+type pytorchPlugin struct {
+ pytorchArguments []string
+ clientset pluginsinterface.PluginClientset
+ masterName string
+ workerName string
+ port int
+}
+
+// New creates pytorch plugin.
+func New(client pluginsinterface.PluginClientset, arguments []string) pluginsinterface.PluginInterface {
+ pp := pytorchPlugin{pytorchArguments: arguments, clientset: client}
+ pp.addFlags()
+ return &pp
+}
+
+func (pp *pytorchPlugin) addFlags() {
+ flagSet := flag.NewFlagSet(pp.Name(), flag.ContinueOnError)
+ flagSet.StringVar(&pp.masterName, "master", DefaultMaster, "name of master role task")
+ flagSet.StringVar(&pp.workerName, "worker", DefaultWorker, "name of worker role task")
+ flagSet.IntVar(&pp.port, "port", DefaultPort, "open port for containers")
+ if err := flagSet.Parse(pp.pytorchArguments); err != nil {
+ klog.Errorf("plugin %s flagset parse failed, err: %v", pp.Name(), err)
+ }
+}
+
+func (pp *pytorchPlugin) Name() string {
+ return PytorchPluginName
+}
+
+func (pp *pytorchPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
+ taskType := helpers.GetTaskKey(pod)
+ masterIndex := helpers.GetTasklndexUnderJob(pp.masterName, job)
+ if masterIndex == -1 {
+ klog.Errorf("job %v doesn't have task %v", job.Name, pp.masterName)
+ return nil
+ }
+
+ masterEnvVars := []v1.EnvVar{}
+ masterAddr := pp.generateMasterAddr(job.Spec.Tasks[masterIndex], job.Name)
+ masterEnvVars = append(masterEnvVars, v1.EnvVar{
+ Name: EnvMasterAddr,
+ Value: masterAddr,
+ }, v1.EnvVar{
+ Name: EnvMasterPort,
+ Value: fmt.Sprintf("%v", pp.port),
+ })
+
+ masterRank := 0
+ workerRank := 0
+ if taskType == pp.workerName {
+ index, err := strconv.Atoi(helpers.GetPodIndexUnderTask(pod))
+ if err != nil {
+ return err
+ }
+
+ workerRank = index + 1
+ }
+
+ totalReplicas := pp.getTotalReplicas(job)
+ for i, c := range pod.Spec.Containers {
+ pp.openContainerPort(&c, i, pod)
+
+ pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, masterEnvVars...)
+ pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, v1.EnvVar{
+ Name: EnvWorldSize,
+ Value: strconv.Itoa(int(totalReplicas)),
+ })
+
+ if taskType == pp.workerName {
+ pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, v1.EnvVar{
+ Name: EnvRank,
+ Value: strconv.Itoa(workerRank),
+ })
+ } else if taskType == pp.masterName {
+ pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, v1.EnvVar{
+ Name: EnvRank,
+ Value: strconv.Itoa(masterRank),
+ })
+ }
+ }
+
+ return nil
+}
+
+func (pp *pytorchPlugin) getTotalReplicas(job *batch.Job) int32 {
+ jobReplicas := int32(0)
+ for _, task := range job.Spec.Tasks {
+ jobReplicas += task.Replicas
+ }
+
+ return jobReplicas
+}
+
+func (pp *pytorchPlugin) generateMasterAddr(task batch.TaskSpec, jobName string) string {
+ hostName := task.Template.Spec.Hostname
+ subdomain := task.Template.Spec.Subdomain
+ if len(hostName) == 0 {
+ hostName = helpers.MakePodName(jobName, task.Name, 0)
+ }
+ if len(subdomain) == 0 {
+ subdomain = jobName
+ }
+
+ host := hostName + "." + subdomain
+ return host
+}
+
+func (pp *pytorchPlugin) openContainerPort(c *v1.Container, index int, pod *v1.Pod) {
+ hasPort := false
+ for _, p := range c.Ports {
+ if p.ContainerPort == int32(pp.port) {
+ hasPort = true
+ break
+ }
+ }
+
+ if !hasPort {
+ port := v1.ContainerPort{
+ Name: "pytorchjob-port",
+ ContainerPort: int32(pp.port),
+ }
+
+ pod.Spec.Containers[index].Ports = append(pod.Spec.Containers[index].Ports, port)
+ }
+}
+
+func (pp *pytorchPlugin) OnJobAdd(job *batch.Job) error {
+ if job.Status.ControlledResources["plugin-"+pp.Name()] == pp.Name() {
+ return nil
+ }
+ job.Status.ControlledResources["plugin-"+pp.Name()] = pp.Name()
+ return nil
+}
+
+func (pp *pytorchPlugin) OnJobDelete(job *batch.Job) error {
+ if job.Status.ControlledResources["plugin-"+pp.Name()] != pp.Name() {
+ return nil
+ }
+ delete(job.Status.ControlledResources, "plugin-"+pp.Name())
+ return nil
+}
+
+func (pp *pytorchPlugin) OnJobUpdate(job *batch.Job) error {
+ return nil
+}
diff --git a/pkg/controllers/job/plugins/distributed-framework/pytorch/pytorch_test.go b/pkg/controllers/job/plugins/distributed-framework/pytorch/pytorch_test.go
new file mode 100644
index 00000000000..95abbd43896
--- /dev/null
+++ b/pkg/controllers/job/plugins/distributed-framework/pytorch/pytorch_test.go
@@ -0,0 +1,373 @@
+package pytorch
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "volcano.sh/apis/pkg/apis/batch/v1alpha1"
+ pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface"
+)
+
+func TestPytorch(t *testing.T) {
+ plugins := make(map[string][]string)
+ plugins[PytorchPluginName] = []string{"--port=5000"}
+
+ testcases := []struct {
+ Name string
+ Job *v1alpha1.Job
+ Pod *v1.Pod
+ port int
+ envs []v1.EnvVar
+ }{
+ {
+ Name: "test pod without master",
+ Job: &v1alpha1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-pytorch"},
+ Spec: v1alpha1.JobSpec{
+ Tasks: []v1alpha1.TaskSpec{
+ {
+ Name: "worker",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ },
+ },
+ },
+ Pod: &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pytorch-worker-0",
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "worker",
+ },
+ },
+ },
+ },
+ port: -1,
+ envs: nil,
+ },
+ {
+ Name: "test master pod without port",
+ Job: &v1alpha1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-pytorch"},
+ Spec: v1alpha1.JobSpec{
+ Tasks: []v1alpha1.TaskSpec{
+ {
+ Name: "master",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ {
+ Name: "worker",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ },
+ },
+ },
+ Pod: &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pytorch-master-0",
+ Annotations: map[string]string{
+ v1alpha1.TaskSpecKey: "master",
+ },
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "master",
+ },
+ },
+ },
+ },
+ port: DefaultPort,
+ envs: []v1.EnvVar{
+ {
+ Name: EnvMasterAddr,
+ Value: "test-pytorch-master-0.test-pytorch",
+ },
+ {
+ Name: EnvMasterPort,
+ Value: fmt.Sprintf("%v", DefaultPort),
+ },
+ {
+ Name: "WORLD_SIZE",
+ Value: fmt.Sprintf("%v", 2),
+ },
+ {
+ Name: "RANK",
+ Value: fmt.Sprintf("%v", 0),
+ },
+ },
+ },
+ {
+ Name: "test master pod with port",
+ Job: &v1alpha1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-pytorch"},
+ Spec: v1alpha1.JobSpec{
+ Tasks: []v1alpha1.TaskSpec{
+ {
+ Name: "master",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ {
+ Name: "worker",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ },
+ },
+ },
+ Pod: &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pytorch-master-0",
+ Annotations: map[string]string{
+ v1alpha1.TaskSpecKey: "master",
+ },
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "master",
+ Ports: []v1.ContainerPort{
+ {
+ Name: "pytorchjob-port",
+ ContainerPort: 23456,
+ },
+ },
+ },
+ },
+ },
+ },
+ port: DefaultPort,
+ envs: []v1.EnvVar{
+ {
+ Name: EnvMasterAddr,
+ Value: "test-pytorch-master-0.test-pytorch",
+ },
+ {
+ Name: EnvMasterPort,
+ Value: fmt.Sprintf("%v", DefaultPort),
+ },
+ {
+ Name: "WORLD_SIZE",
+ Value: fmt.Sprintf("%v", 2),
+ },
+ {
+ Name: "RANK",
+ Value: fmt.Sprintf("%v", 0),
+ },
+ },
+ },
+ {
+ Name: "test master pod env",
+ Job: &v1alpha1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-pytorch"},
+ Spec: v1alpha1.JobSpec{
+ Tasks: []v1alpha1.TaskSpec{
+ {
+ Name: "master",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ {
+ Name: "worker",
+ Replicas: 2,
+ Template: v1.PodTemplateSpec{},
+ },
+ },
+ },
+ },
+ Pod: &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pytorch-master-0",
+ Annotations: map[string]string{
+ v1alpha1.TaskSpecKey: "master",
+ },
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "master",
+ Ports: []v1.ContainerPort{
+ {
+ Name: "pytorchjob-port",
+ ContainerPort: 123,
+ },
+ },
+ },
+ },
+ },
+ },
+ port: 123,
+ envs: []v1.EnvVar{
+ {
+ Name: EnvMasterAddr,
+ Value: "test-pytorch-master-0.test-pytorch",
+ },
+ {
+ Name: EnvMasterPort,
+ Value: fmt.Sprintf("%v", DefaultPort),
+ },
+ {
+ Name: "WORLD_SIZE",
+ Value: fmt.Sprintf("%v", 3),
+ },
+ {
+ Name: "RANK",
+ Value: fmt.Sprintf("%v", 0),
+ },
+ },
+ },
+ {
+ Name: "test worker-1 pod env",
+ Job: &v1alpha1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-pytorch"},
+ Spec: v1alpha1.JobSpec{
+ Tasks: []v1alpha1.TaskSpec{
+ {
+ Name: "master",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ {
+ Name: "worker",
+ Replicas: 2,
+ Template: v1.PodTemplateSpec{},
+ },
+ },
+ },
+ },
+ Pod: &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pytorch-worker-0",
+ Annotations: map[string]string{
+ v1alpha1.TaskSpecKey: "worker",
+ },
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "worker",
+ Ports: []v1.ContainerPort{
+ {
+ Name: "pytorchjob-port",
+ ContainerPort: 123,
+ },
+ },
+ },
+ },
+ },
+ },
+ port: 123,
+ envs: []v1.EnvVar{
+ {
+ Name: EnvMasterAddr,
+ Value: "test-pytorch-master-0.test-pytorch",
+ },
+ {
+ Name: EnvMasterPort,
+ Value: fmt.Sprintf("%v", DefaultPort),
+ },
+ {
+ Name: "WORLD_SIZE",
+ Value: fmt.Sprintf("%v", 3),
+ },
+ {
+ Name: "RANK",
+ Value: fmt.Sprintf("%v", 1),
+ },
+ },
+ },
+ {
+ Name: "test worker-2 pod env",
+ Job: &v1alpha1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-pytorch"},
+ Spec: v1alpha1.JobSpec{
+ Tasks: []v1alpha1.TaskSpec{
+ {
+ Name: "master",
+ Replicas: 1,
+ Template: v1.PodTemplateSpec{},
+ },
+ {
+ Name: "worker",
+ Replicas: 2,
+ Template: v1.PodTemplateSpec{},
+ },
+ },
+ },
+ },
+ Pod: &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pytorch-worker-1",
+ Annotations: map[string]string{
+ v1alpha1.TaskSpecKey: "worker",
+ },
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "worker",
+ Ports: []v1.ContainerPort{
+ {
+ Name: "pytorchjob-port",
+ ContainerPort: 123,
+ },
+ },
+ },
+ },
+ },
+ },
+ port: 123,
+ envs: []v1.EnvVar{
+ {
+ Name: EnvMasterAddr,
+ Value: "test-pytorch-master-0.test-pytorch",
+ },
+ {
+ Name: EnvMasterPort,
+ Value: fmt.Sprintf("%v", DefaultPort),
+ },
+ {
+ Name: "WORLD_SIZE",
+ Value: fmt.Sprintf("%v", 3),
+ },
+ {
+ Name: "RANK",
+ Value: fmt.Sprintf("%v", 2),
+ },
+ },
+ },
+ }
+
+ for index, testcase := range testcases {
+ t.Run(testcase.Name, func(t *testing.T) {
+ mp := New(pluginsinterface.PluginClientset{}, testcase.Job.Spec.Plugins[PytorchPluginName])
+ if err := mp.OnPodCreate(testcase.Pod, testcase.Job); err != nil {
+ t.Errorf("Case %d (%s): expect no error, but got error %v", index, testcase.Name, err)
+ }
+
+ if testcase.port != -1 {
+ if testcase.Pod.Spec.Containers[0].Ports == nil || testcase.Pod.Spec.Containers[0].Ports[0].ContainerPort != int32(testcase.port) {
+ t.Errorf("Case %d (%s): wrong port, got %d, expected %v", index, testcase.Name, testcase.Pod.Spec.Containers[0].Ports[0].ContainerPort, testcase.port)
+ }
+ } else {
+ if testcase.Pod.Spec.Containers[0].Ports != nil {
+ t.Errorf("Case %d (%s): wrong port, got %d, expected empty", index, testcase.Name, testcase.Pod.Spec.Containers[0].Ports[0].ContainerPort)
+ }
+ }
+
+ if !reflect.DeepEqual(testcase.Pod.Spec.Containers[0].Env, testcase.envs) {
+ t.Errorf("Case %d (%s): wrong envs, got %v, expected %v", index, testcase.Name, testcase.Pod.Spec.Containers[0].Env, testcase.envs)
+ }
+ })
+ }
+}
diff --git a/pkg/controllers/job/plugins/distributed-framework/tensorflow/tensorflow.go b/pkg/controllers/job/plugins/distributed-framework/tensorflow/tensorflow.go
index 13a90d20f28..1687a751c10 100644
--- a/pkg/controllers/job/plugins/distributed-framework/tensorflow/tensorflow.go
+++ b/pkg/controllers/job/plugins/distributed-framework/tensorflow/tensorflow.go
@@ -31,6 +31,8 @@ import (
)
const (
+ // TFPluginName is the name of the plugin
+ TFPluginName = "tensorflow"
// DefaultPort defines default port for service
DefaultPort = 2222
// TFConfig defines environment variables for TF
@@ -67,7 +69,7 @@ func (tp *tensorflowPlugin) addFlags() {
}
func (tp *tensorflowPlugin) Name() string {
- return "tensorflow"
+ return TFPluginName
}
func (tp *tensorflowPlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error {
diff --git a/pkg/controllers/job/plugins/factory.go b/pkg/controllers/job/plugins/factory.go
index 21256321b0e..2b649c11a36 100644
--- a/pkg/controllers/job/plugins/factory.go
+++ b/pkg/controllers/job/plugins/factory.go
@@ -20,6 +20,7 @@ import (
"sync"
"volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/mpi"
+ "volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/pytorch"
"volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/tensorflow"
"volcano.sh/volcano/pkg/controllers/job/plugins/env"
pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface"
@@ -33,6 +34,7 @@ func init() {
RegisterPluginBuilder("svc", svc.New)
RegisterPluginBuilder("tensorflow", tensorflow.New)
RegisterPluginBuilder("mpi", mpi.New)
+ RegisterPluginBuilder("pytorch", pytorch.New)
}
var pluginMutex sync.Mutex
diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go
index 56c8967d5b2..8651ab24eb2 100644
--- a/pkg/controllers/podgroup/pg_controller.go
+++ b/pkg/controllers/podgroup/pg_controller.go
@@ -33,6 +33,7 @@ import (
schedulinginformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
schedulinglister "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/framework"
+ commonutil "volcano.sh/volcano/pkg/util"
)
func init() {
@@ -137,7 +138,7 @@ func (pg *pgcontroller) processNextReq() bool {
return true
}
- if !contains(pg.schedulerNames, pod.Spec.SchedulerName) {
+ if !commonutil.Contains(pg.schedulerNames, pod.Spec.SchedulerName) {
klog.V(5).Infof("pod %v/%v field SchedulerName is not matched", pod.Namespace, pod.Name)
return true
}
@@ -159,12 +160,3 @@ func (pg *pgcontroller) processNextReq() bool {
return true
}
-
-func contains(slice []string, element string) bool {
- for _, item := range slice {
- if item == element {
- return true
- }
- }
- return false
-}
diff --git a/pkg/controllers/podgroup/pg_controller_handler.go b/pkg/controllers/podgroup/pg_controller_handler.go
index a5bac2ee48c..931fba7bb74 100644
--- a/pkg/controllers/podgroup/pg_controller_handler.go
+++ b/pkg/controllers/podgroup/pg_controller_handler.go
@@ -18,6 +18,7 @@ package podgroup
import (
"context"
+ "encoding/json"
"strings"
v1 "k8s.io/api/core/v1"
@@ -28,6 +29,7 @@ import (
quotacore "k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
"k8s.io/utils/clock"
+ "k8s.io/apimachinery/pkg/types"
"volcano.sh/apis/pkg/apis/helpers"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)
@@ -37,6 +39,14 @@ type podRequest struct {
podNamespace string
}
+type metadataForMergePatch struct {
+ Metadata annotationForMergePatch `json:"metadata"`
+}
+
+type annotationForMergePatch struct {
+ Annotations map[string]string `json:"annotations"`
+}
+
func (pg *pgcontroller) addPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
@@ -57,20 +67,30 @@ func (pg *pgcontroller) updatePodAnnotations(pod *v1.Pod, pgName string) error {
pod.Annotations = make(map[string]string)
}
if pod.Annotations[scheduling.KubeGroupNameAnnotationKey] == "" {
- pod.Annotations[scheduling.KubeGroupNameAnnotationKey] = pgName
+ patch := metadataForMergePatch{
+ Metadata: annotationForMergePatch{
+ Annotations: map[string]string{
+ scheduling.KubeGroupNameAnnotationKey: pgName,
+ },
+ },
+ }
+
+ patchBytes, err := json.Marshal(&patch)
+ if err != nil {
+ klog.Errorf("Failed to json.Marshal pod annotation: %v", err)
+ return err
+ }
+
+ if _, err := pg.kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
+ klog.Errorf("Failed to update pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
+ return err
+ }
} else {
if pod.Annotations[scheduling.KubeGroupNameAnnotationKey] != pgName {
klog.Errorf("normal pod %s/%s annotations %s value is not %s, but %s", pod.Namespace, pod.Name,
scheduling.KubeGroupNameAnnotationKey, pgName, pod.Annotations[scheduling.KubeGroupNameAnnotationKey])
}
- return nil
- }
-
- if _, err := pg.kubeClient.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
- klog.Errorf("Failed to update pod <%s/%s>: %v", pod.Namespace, pod.Name, err)
- return err
}
-
return nil
}
diff --git a/pkg/controllers/podgroup/pg_controller_test.go b/pkg/controllers/podgroup/pg_controller_test.go
index a79021d9b43..ecb9f5cf173 100644
--- a/pkg/controllers/podgroup/pg_controller_test.go
+++ b/pkg/controllers/podgroup/pg_controller_test.go
@@ -170,7 +170,12 @@ func TestAddPodGroup(t *testing.T) {
t.Errorf("Case %s failed, expect %v, got %v", testCase.name, testCase.expectedPodGroup, pg)
}
- podAnnotation := pod.Annotations[scheduling.KubeGroupNameAnnotationKey]
+ newpod, err := c.kubeClient.CoreV1().Pods(testCase.pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
+ if err != nil {
+ t.Errorf("Case %s failed when creating pod for %v", testCase.name, err)
+ }
+
+ podAnnotation := newpod.Annotations[scheduling.KubeGroupNameAnnotationKey]
if testCase.expectedPodGroup.Name != podAnnotation {
t.Errorf("Case %s failed, expect %v, got %v", testCase.name,
testCase.expectedPodGroup.Name, podAnnotation)
diff --git a/pkg/scheduler/api/cluster_info.go b/pkg/scheduler/api/cluster_info.go
index a9a3fa1f447..53569a88407 100644
--- a/pkg/scheduler/api/cluster_info.go
+++ b/pkg/scheduler/api/cluster_info.go
@@ -28,6 +28,7 @@ type ClusterInfo struct {
NamespaceInfo map[NamespaceName]*NamespaceInfo
RevocableNodes map[string]*NodeInfo
NodeList []string
+ CSINodesStatus map[string]*CSINodeStatusInfo
}
func (ci ClusterInfo) String() string {
diff --git a/pkg/scheduler/api/device_info.go b/pkg/scheduler/api/device_info.go
index 369d48bab99..cfb35bb332e 100644
--- a/pkg/scheduler/api/device_info.go
+++ b/pkg/scheduler/api/device_info.go
@@ -60,17 +60,29 @@ func (g *GPUDevice) isIdleGPU() bool {
// GetGPUMemoryOfPod returns the GPU memory required by the pod.
func GetGPUMemoryOfPod(pod *v1.Pod) uint {
+ var initMem uint
+ for _, container := range pod.Spec.InitContainers {
+ res := GetGPUMemoryOfContainer(container.Resources)
+ if initMem < res {
+ initMem = res
+ }
+ }
+
var mem uint
for _, container := range pod.Spec.Containers {
- mem += GetGPUMemoryOfContainer(&container)
+ mem += GetGPUMemoryOfContainer(container.Resources)
}
- return mem
+
+ if mem > initMem {
+ return mem
+ }
+ return initMem
}
// GetGPUMemoryOfPod returns the GPU memory required by the container.
-func GetGPUMemoryOfContainer(container *v1.Container) uint {
+func GetGPUMemoryOfContainer(resources v1.ResourceRequirements) uint {
var mem uint
- if val, ok := container.Resources.Limits[VolcanoGPUResource]; ok {
+ if val, ok := resources.Limits[VolcanoGPUResource]; ok {
mem = uint(val.Value())
}
return mem
diff --git a/pkg/scheduler/api/device_info_test.go b/pkg/scheduler/api/device_info_test.go
new file mode 100644
index 00000000000..1c7f4eee784
--- /dev/null
+++ b/pkg/scheduler/api/device_info_test.go
@@ -0,0 +1,99 @@
+/*
+Copyright 2022 The Volcano Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+ "testing"
+
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+)
+
+func TestGetGPUResourceOfPod(t *testing.T) {
+ testCases := []struct {
+ name string
+ pod *v1.Pod
+ want uint
+ }{
+ {
+ name: "GPUs required only in Containers",
+ pod: &v1.Pod{
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Resources: v1.ResourceRequirements{
+ Limits: v1.ResourceList{
+ VolcanoGPUResource: resource.MustParse("1"),
+ },
+ },
+ },
+ {
+ Resources: v1.ResourceRequirements{
+ Limits: v1.ResourceList{
+ VolcanoGPUResource: resource.MustParse("3"),
+ },
+ },
+ },
+ },
+ },
+ },
+ want: 4,
+ },
+ {
+ name: "GPUs required both in initContainers and Containers",
+ pod: &v1.Pod{
+ Spec: v1.PodSpec{
+ InitContainers: []v1.Container{
+ {
+ Resources: v1.ResourceRequirements{
+ Limits: v1.ResourceList{
+ VolcanoGPUResource: resource.MustParse("1"),
+ },
+ },
+ },
+ {
+ Resources: v1.ResourceRequirements{
+ Limits: v1.ResourceList{
+ VolcanoGPUResource: resource.MustParse("3"),
+ },
+ },
+ },
+ },
+ Containers: []v1.Container{
+ {
+ Resources: v1.ResourceRequirements{
+ Limits: v1.ResourceList{
+ VolcanoGPUResource: resource.MustParse("2"),
+ },
+ },
+ },
+ },
+ },
+ },
+ want: 3,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ got := GetGPUResourceOfPod(tc.pod)
+ if tc.want != got {
+ t.Errorf("unexpected result, want: %v, got: %v", tc.want, got)
+ }
+ })
+ }
+}
diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go
index 2e650a73e26..1017f7fb99a 100644
--- a/pkg/scheduler/api/node_info.go
+++ b/pkg/scheduler/api/node_info.go
@@ -35,6 +35,11 @@ func (o *AllocateFailError) Error() string {
return o.Reason
}
+type CSINodeStatusInfo struct {
+ CSINodeName string
+ DriverStatus map[string]bool
+}
+
// NodeInfo is node level aggregated information.
type NodeInfo struct {
Name string
@@ -622,3 +627,14 @@ func (ni *NodeInfo) getUnhealthyGPUs(node *v1.Node) (unhealthyGPUs []int) {
}
return
}
+
+func (cs *CSINodeStatusInfo) Clone() *CSINodeStatusInfo {
+ newcs := &CSINodeStatusInfo{
+ CSINodeName: cs.CSINodeName,
+ DriverStatus: make(map[string]bool),
+ }
+ for k, v := range cs.DriverStatus {
+ newcs.DriverStatus[k] = v
+ }
+ return newcs
+}
diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go
index 6ed985584db..607ca44dc69 100644
--- a/pkg/scheduler/cache/cache.go
+++ b/pkg/scheduler/cache/cache.go
@@ -46,6 +46,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@@ -63,6 +64,7 @@ import (
"volcano.sh/volcano/cmd/scheduler/app/options"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/metrics"
+ commonutil "volcano.sh/volcano/pkg/util"
)
const (
@@ -83,8 +85,8 @@ func init() {
}
// New returns a Cache implementation.
-func New(config *rest.Config, schedulerName string, defaultQueue string, nodeSelectors []string) Cache {
- return newSchedulerCache(config, schedulerName, defaultQueue, nodeSelectors)
+func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string) Cache {
+ return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors)
}
// SchedulerCache cache for the kube batch
@@ -92,10 +94,11 @@ type SchedulerCache struct {
sync.Mutex
kubeClient *kubernetes.Clientset
+ restConfig *rest.Config
vcClient *vcclient.Clientset
defaultQueue string
// schedulerName is the name for volcano scheduler
- schedulerName string
+ schedulerNames []string
nodeSelectorLabels map[string]string
metricsConf map[string]string
@@ -128,6 +131,7 @@ type SchedulerCache struct {
NodeList []string
defaultPriorityClass *schedulingv1.PriorityClass
defaultPriority int32
+ CSINodesStatus map[string]*schedulingapi.CSINodeStatusInfo
NamespaceCollection map[string]*schedulingapi.NamespaceCollection
@@ -146,7 +150,7 @@ type DefaultBinder struct {
// kubeclient *kubernetes.Clientset
}
-//Bind will send bind request to api server
+// Bind will send bind request to api server
func (db *DefaultBinder) Bind(kubeClient *kubernetes.Clientset, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, error) {
var errTasks []*schedulingapi.TaskInfo
for _, task := range tasks {
@@ -373,7 +377,7 @@ func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*sc
return job, nil
}
-func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string, nodeSelectors []string) *SchedulerCache {
+func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
@@ -398,9 +402,22 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
Weight: 1,
},
}
- if _, err := vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), &defaultQue, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
- panic(fmt.Sprintf("failed init default queue, with err: %v", err))
+
+ err = retry.OnError(wait.Backoff{
+ Steps: 60,
+ Duration: time.Second,
+ Factor: 1,
+ Jitter: 0.1,
+ }, func(err error) bool {
+ return !apierrors.IsAlreadyExists(err)
+ }, func() error {
+ _, err := vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), &defaultQue, metav1.CreateOptions{})
+ return err
+ })
+ if err != nil {
+ panic(fmt.Errorf("failed init default queue, with err: %v", err))
}
+ klog.Infof("Create init queue named default")
sc := &SchedulerCache{
Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo),
@@ -411,10 +428,12 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeClient: kubeClient,
vcClient: vcClient,
+ restConfig: config,
defaultQueue: defaultQueue,
- schedulerName: schedulerName,
+ schedulerNames: schedulerNames,
nodeSelectorLabels: make(map[string]string),
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
+ CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
NodeList: []string{},
}
@@ -438,7 +457,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
- sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})
+ sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(sc.schedulerNames)})
sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
sc.Binder = GetBindMethod()
@@ -509,6 +528,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
sc.pvInformer = informerFactory.Core().V1().PersistentVolumes()
sc.scInformer = informerFactory.Storage().V1().StorageClasses()
sc.csiNodeInformer = informerFactory.Storage().V1().CSINodes()
+ sc.csiNodeInformer.Informer().AddEventHandler(
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: sc.AddOrUpdateCSINode,
+ UpdateFunc: sc.UpdateCSINode,
+ DeleteFunc: sc.DeleteCSINode,
+ },
+ )
sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers()
sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities()
@@ -542,7 +568,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *v1.Pod:
- if !responsibleForPod(v, schedulerName, mySchedulerPodName, c) {
+ if !responsibleForPod(v, schedulerNames, mySchedulerPodName, c) {
if len(v.Spec.NodeName) == 0 {
return false
}
@@ -771,6 +797,11 @@ func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
}
+// ClientConfig returns the rest config
+func (sc *SchedulerCache) ClientConfig() *rest.Config {
+ return sc.restConfig
+}
+
// SharedInformerFactory returns the scheduler SharedInformerFactory
func (sc *SchedulerCache) SharedInformerFactory() informers.SharedInformerFactory {
return sc.informerFactory
@@ -993,6 +1024,7 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
NamespaceInfo: make(map[schedulingapi.NamespaceName]*schedulingapi.NamespaceInfo),
RevocableNodes: make(map[string]*schedulingapi.NodeInfo),
NodeList: make([]string, len(sc.NodeList)),
+ CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
}
copy(snapshot.NodeList, sc.NodeList)
@@ -1000,6 +1032,10 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
value.RefreshNumaSchedulerInfoByCrd()
}
+ for _, value := range sc.CSINodesStatus {
+ snapshot.CSINodesStatus[value.CSINodeName] = value.Clone()
+ }
+
for _, value := range sc.Nodes {
if !value.Ready() {
continue
diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go
index 936d73d202e..e90271659de 100644
--- a/pkg/scheduler/cache/cache_test.go
+++ b/pkg/scheduler/cache/cache_test.go
@@ -105,9 +105,9 @@ func TestGetOrCreateJob(t *testing.T) {
pi3 := api.NewTaskInfo(pod3)
cache := &SchedulerCache{
- Nodes: make(map[string]*api.NodeInfo),
- Jobs: make(map[api.JobID]*api.JobInfo),
- schedulerName: "volcano",
+ Nodes: make(map[string]*api.NodeInfo),
+ Jobs: make(map[api.JobID]*api.JobInfo),
+ schedulerNames: []string{"volcano"},
}
tests := []struct {
diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go
index 07ed8cafc8d..56606ecc7b7 100644
--- a/pkg/scheduler/cache/event_handlers.go
+++ b/pkg/scheduler/cache/event_handlers.go
@@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
+ "reflect"
"strconv"
v1 "k8s.io/api/core/v1"
@@ -30,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
+ sv1 "k8s.io/api/storage/v1"
nodeinfov1alpha1 "volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/scheme"
@@ -37,6 +39,7 @@ import (
"volcano.sh/apis/pkg/apis/utils"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/metrics"
+ commonutil "volcano.sh/volcano/pkg/util"
)
func isTerminated(status schedulingapi.TaskStatus) bool {
@@ -47,9 +50,9 @@ func isTerminated(status schedulingapi.TaskStatus) bool {
// pi.Pod.Spec.SchedulerName is same as volcano scheduler's name, otherwise it will return nil.
func (sc *SchedulerCache) getOrCreateJob(pi *schedulingapi.TaskInfo) *schedulingapi.JobInfo {
if len(pi.Job) == 0 {
- if pi.Pod.Spec.SchedulerName != sc.schedulerName {
- klog.V(4).Infof("Pod %s/%s will not scheduled by %s, skip creating PodGroup and Job for it",
- pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName)
+ if !commonutil.Contains(sc.schedulerNames, pi.Pod.Spec.SchedulerName) {
+ klog.V(4).Infof("Pod %s/%s will not scheduled by %#v, skip creating PodGroup and Job for it",
+ pi.Pod.Namespace, pi.Pod.Name, sc.schedulerNames)
}
return nil
}
@@ -402,6 +405,69 @@ func (sc *SchedulerCache) DeleteNode(obj interface{}) {
}
}
+func (sc *SchedulerCache) AddOrUpdateCSINode(obj interface{}) {
+ csiNode, ok := obj.(*sv1.CSINode)
+ if !ok {
+ return
+ }
+
+ var csiNodeStatus *schedulingapi.CSINodeStatusInfo
+ var found bool
+ sc.Mutex.Lock()
+ defer sc.Mutex.Unlock()
+ // update nodeVolumeCount
+
+ if csiNodeStatus, found = sc.CSINodesStatus[csiNode.Name]; !found {
+ csiNodeStatus = &schedulingapi.CSINodeStatusInfo{
+ CSINodeName: csiNode.Name,
+ DriverStatus: make(map[string]bool),
+ }
+ sc.CSINodesStatus[csiNode.Name] = csiNodeStatus
+ }
+
+ for i := range csiNode.Spec.Drivers {
+ d := csiNode.Spec.Drivers[i]
+ csiNodeStatus.DriverStatus[d.Name] = d.Allocatable != nil && d.Allocatable.Count != nil
+ }
+}
+
+func (sc *SchedulerCache) UpdateCSINode(oldObj, newObj interface{}) {
+ oldCSINode, ok := oldObj.(*sv1.CSINode)
+ if !ok {
+ return
+ }
+ newCSINode, ok := newObj.(*sv1.CSINode)
+ if !ok {
+ return
+ }
+ if reflect.DeepEqual(oldCSINode.Spec, newCSINode.Spec) {
+ return
+ }
+ sc.AddOrUpdateCSINode(newObj)
+}
+
+func (sc *SchedulerCache) DeleteCSINode(obj interface{}) {
+ var csiNode *sv1.CSINode
+ switch t := obj.(type) {
+ case *sv1.CSINode:
+ csiNode = obj.(*sv1.CSINode)
+ case cache.DeletedFinalStateUnknown:
+ var ok bool
+ csiNode, ok = t.Obj.(*sv1.CSINode)
+ if !ok {
+ klog.Errorf("Cannot convert to *sv1.CSINode: %v", obj)
+ return
+ }
+ default:
+ klog.Errorf("Cannot convert to *sv1.CSINode: %v", obj)
+ return
+ }
+
+ sc.Mutex.Lock()
+ delete(sc.CSINodesStatus, csiNode.Name)
+ sc.Mutex.Unlock()
+}
+
func getJobID(pg *schedulingapi.PodGroup) schedulingapi.JobID {
return schedulingapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}
diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go
index ffb76df633b..c46f71af888 100644
--- a/pkg/scheduler/cache/interface.go
+++ b/pkg/scheduler/cache/interface.go
@@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
scheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
@@ -70,6 +71,9 @@ type Cache interface {
// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface
+ // ClientConfig returns the rest config
+ ClientConfig() *rest.Config
+
UpdateSchedulerNumaInfo(sets map[string]api.ResNumaSets) error
// SharedInformerFactory return scheduler SharedInformerFactory
diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go
index 5bf3dbf59b9..87ebd10bfaa 100644
--- a/pkg/scheduler/cache/util.go
+++ b/pkg/scheduler/cache/util.go
@@ -27,13 +27,14 @@ import (
"stathat.com/c/consistent"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
+ commonutil "volcano.sh/volcano/pkg/util"
)
// responsibleForPod returns false at following conditions:
// 1. The current scheduler is not specified scheduler in Pod's spec.
// 2. The Job which the Pod belongs is not assigned to current scheduler based on the hash algorithm in multi-schedulers scenario
-func responsibleForPod(pod *v1.Pod, schedulerName string, mySchedulerPodName string, c *consistent.Consistent) bool {
- if schedulerName != pod.Spec.SchedulerName {
+func responsibleForPod(pod *v1.Pod, schedulerNames []string, mySchedulerPodName string, c *consistent.Consistent) bool {
+ if !commonutil.Contains(schedulerNames, pod.Spec.SchedulerName) {
return false
}
if c != nil {
diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go
index 5bf817ff617..8317ecfa728 100644
--- a/pkg/scheduler/framework/session.go
+++ b/pkg/scheduler/framework/session.go
@@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"volcano.sh/apis/pkg/apis/scheduling"
@@ -45,6 +46,7 @@ type Session struct {
kubeClient kubernetes.Interface
recorder record.EventRecorder
cache cache.Cache
+ restConfig *rest.Config
informerFactory informers.SharedInformerFactory
TotalResource *api.Resource
@@ -54,6 +56,7 @@ type Session struct {
Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
+ CSINodesStatus map[string]*api.CSINodeStatusInfo
RevocableNodes map[string]*api.NodeInfo
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo
@@ -94,6 +97,7 @@ func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
+ restConfig: cache.ClientConfig(),
recorder: cache.EventRecorder(),
cache: cache,
informerFactory: cache.SharedInformerFactory(),
@@ -103,6 +107,7 @@ func openSession(cache cache.Cache) *Session {
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
+ CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
@@ -163,6 +168,7 @@ func openSession(cache cache.Cache) *Session {
}
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.Nodes = snapshot.Nodes
+ ssn.CSINodesStatus = snapshot.CSINodesStatus
ssn.RevocableNodes = snapshot.RevocableNodes
ssn.Queues = snapshot.Queues
ssn.NamespaceInfo = snapshot.NamespaceInfo
@@ -466,6 +472,11 @@ func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
}
+// ClientConfig returns the rest client
+func (ssn Session) ClientConfig() *rest.Config {
+ return ssn.restConfig
+}
+
// InformerFactory returns the scheduler ShareInformerFactory
func (ssn Session) InformerFactory() informers.SharedInformerFactory {
return ssn.informerFactory
diff --git a/pkg/scheduler/plugins/OWNERS b/pkg/scheduler/plugins/OWNERS
new file mode 100644
index 00000000000..bdeefe7e045
--- /dev/null
+++ b/pkg/scheduler/plugins/OWNERS
@@ -0,0 +1,17 @@
+approvers:
+ - k82cn
+ - animeshsingh
+ - Thor-wl
+ - shinytang6
+ - zen-xu
+ - qiankunli
+reviewers:
+ - k82cn
+ - animeshsingh
+ - william-wang
+ - Thor-wl
+ - alcorj-mizar
+ - hudson741
+ - zen-xu
+ - shinytang6
+ - merryzhou
diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go
index 9098921fa4e..4fb751da2d2 100644
--- a/pkg/scheduler/scheduler.go
+++ b/pkg/scheduler/scheduler.go
@@ -53,7 +53,7 @@ type Scheduler struct {
// NewScheduler returns a scheduler
func NewScheduler(
config *rest.Config,
- schedulerName string,
+ schedulerNames []string,
schedulerConf string,
period time.Duration,
defaultQueue string,
@@ -72,7 +72,7 @@ func NewScheduler(
scheduler := &Scheduler{
schedulerConf: schedulerConf,
fileWatcher: watcher,
- cache: schedcache.New(config, schedulerName, defaultQueue, nodeSelectors),
+ cache: schedcache.New(config, schedulerNames, defaultQueue, nodeSelectors),
schedulePeriod: period,
}
diff --git a/pkg/util/util.go b/pkg/util/util.go
new file mode 100644
index 00000000000..6688f290511
--- /dev/null
+++ b/pkg/util/util.go
@@ -0,0 +1,34 @@
+package util
+
+const (
+ defaultSchedulerName = "volcano"
+)
+
+// Contains check if slice contains element
+func Contains(slice []string, element string) bool {
+ for _, item := range slice {
+ if item == element {
+ return true
+ }
+ }
+ return false
+}
+
+// GenerateComponentName generate component name volcano
+func GenerateComponentName(schedulerNames []string) string {
+ if len(schedulerNames) == 1 {
+ return schedulerNames[0]
+ }
+
+ return defaultSchedulerName
+}
+
+// GenerateSchedulerName generate scheduler name for volcano job
+func GenerateSchedulerName(schedulerNames []string) string {
+ // choose the first scheduler name for volcano job if its schedulerName is empty
+ if len(schedulerNames) > 0 {
+ return schedulerNames[0]
+ }
+
+ return defaultSchedulerName
+}
diff --git a/pkg/webhooks/OWNERS b/pkg/webhooks/OWNERS
new file mode 100644
index 00000000000..c741d7afcc8
--- /dev/null
+++ b/pkg/webhooks/OWNERS
@@ -0,0 +1,22 @@
+reviewers:
+ - k82cn
+ - kevin-wangzefeng
+ - animeshsingh
+ - william-wang
+ - Thor-wl
+ - merryzhou
+ - wpeng102
+ - shinytang6
+ - huone1
+ - jasonliu747
+ - qiankunli
+ - hwdef
+approvers:
+ - k82cn
+ - kevin-wangzefeng
+ - animeshsingh
+ - william-wang
+ - Thor-wl
+ - shinytang6
+ - wpeng102
+ - hwdef
diff --git a/pkg/webhooks/admission/jobs/mutate/mutate_job.go b/pkg/webhooks/admission/jobs/mutate/mutate_job.go
index 36798ab653f..fbd8aeccf13 100644
--- a/pkg/webhooks/admission/jobs/mutate/mutate_job.go
+++ b/pkg/webhooks/admission/jobs/mutate/mutate_job.go
@@ -27,7 +27,10 @@ import (
"k8s.io/klog"
"volcano.sh/apis/pkg/apis/batch/v1alpha1"
- controllerMpi "volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/mpi"
+ "volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/mpi"
+ "volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/pytorch"
+ "volcano.sh/volcano/pkg/controllers/job/plugins/distributed-framework/tensorflow"
+ commonutil "volcano.sh/volcano/pkg/util"
"volcano.sh/volcano/pkg/webhooks/router"
"volcano.sh/volcano/pkg/webhooks/schema"
"volcano.sh/volcano/pkg/webhooks/util"
@@ -39,8 +42,6 @@ const (
// DefaultMaxRetry is the default number of retries.
DefaultMaxRetry = 3
- defaultSchedulerName = "volcano"
-
defaultMaxRetry int32 = 3
)
@@ -52,6 +53,8 @@ var service = &router.AdmissionService{
Path: "/jobs/mutate",
Func: Jobs,
+ Config: config,
+
MutatingConfig: &whv1.MutatingWebhookConfiguration{
Webhooks: []whv1.MutatingWebhook{{
Name: "mutatejob.volcano.sh",
@@ -69,6 +72,8 @@ var service = &router.AdmissionService{
},
}
+var config = &router.AdmissionServiceConfig{}
+
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
@@ -147,7 +152,7 @@ func patchDefaultQueue(job *v1alpha1.Job) *patchOperation {
func patchDefaultScheduler(job *v1alpha1.Job) *patchOperation {
// Add default scheduler name if not specified.
if job.Spec.SchedulerName == "" {
- return &patchOperation{Op: "add", Path: "/spec/schedulerName", Value: defaultSchedulerName}
+ return &patchOperation{Op: "add", Path: "/spec/schedulerName", Value: commonutil.GenerateSchedulerName(config.SchedulerNames)}
}
return nil
}
@@ -179,7 +184,7 @@ func patchDefaultMinAvailable(job *v1alpha1.Job) *patchOperation {
func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string, job *v1alpha1.Job) *patchOperation {
// TODO: Enable this configuration when dependOn supports coexistence with the gang plugin
- // if _, ok := job.Spec.Plugins[controllerMpi.MpiPluginName]; ok {
+ // if _, ok := job.Spec.Plugins[mpi.MpiPluginName]; ok {
// mpi.AddDependsOn(job)
// }
patched := false
@@ -228,9 +233,10 @@ func patchDefaultPlugins(job *v1alpha1.Job) *patchOperation {
// Because the tensorflow-plugin and mpi-plugin depends on svc-plugin.
// If the svc-plugin is not defined, we should add it.
- _, hasTf := job.Spec.Plugins["tensorflow"]
- _, hasMPI := job.Spec.Plugins[controllerMpi.MPIPluginName]
- if hasTf || hasMPI {
+ _, hasTf := job.Spec.Plugins[tensorflow.TFPluginName]
+ _, hasMPI := job.Spec.Plugins[mpi.MPIPluginName]
+ _, hasPytorch := job.Spec.Plugins[pytorch.PytorchPluginName]
+ if hasTf || hasMPI || hasPytorch {
if _, ok := plugins["svc"]; !ok {
plugins["svc"] = []string{}
}
diff --git a/pkg/webhooks/admission/pods/validate/admit_pod.go b/pkg/webhooks/admission/pods/validate/admit_pod.go
index e5de2e416be..4c442cec8cd 100644
--- a/pkg/webhooks/admission/pods/validate/admit_pod.go
+++ b/pkg/webhooks/admission/pods/validate/admit_pod.go
@@ -32,6 +32,7 @@ import (
"volcano.sh/apis/pkg/apis/helpers"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
+ commonutil "volcano.sh/volcano/pkg/util"
"volcano.sh/volcano/pkg/webhooks/router"
"volcano.sh/volcano/pkg/webhooks/schema"
"volcano.sh/volcano/pkg/webhooks/util"
@@ -100,10 +101,9 @@ allow pods to create when
3. check pod budget annotations configure
*/
func validatePod(pod *v1.Pod, reviewResponse *admissionv1.AdmissionResponse) string {
- if pod.Spec.SchedulerName != config.SchedulerName {
+ if !commonutil.Contains(config.SchedulerNames, pod.Spec.SchedulerName) {
return ""
}
-
pgName := ""
msg := ""
diff --git a/pkg/webhooks/admission/pods/validate/admit_pod_test.go b/pkg/webhooks/admission/pods/validate/admit_pod_test.go
index fc69f6f4129..9c733325608 100644
--- a/pkg/webhooks/admission/pods/validate/admit_pod_test.go
+++ b/pkg/webhooks/admission/pods/validate/admit_pod_test.go
@@ -260,7 +260,7 @@ func TestValidatePod(t *testing.T) {
// create fake volcano clientset
config.VolcanoClient = vcclient.NewSimpleClientset()
- config.SchedulerName = "volcano"
+ config.SchedulerNames = []string{"volcano"}
if !testCase.disabledPG {
_, err := config.VolcanoClient.SchedulingV1beta1().PodGroups(namespace).Create(context.TODO(), pg, metav1.CreateOptions{})
diff --git a/pkg/webhooks/router/interface.go b/pkg/webhooks/router/interface.go
index a4906eff0d8..f6fd4aeecca 100644
--- a/pkg/webhooks/router/interface.go
+++ b/pkg/webhooks/router/interface.go
@@ -30,11 +30,11 @@ import (
type AdmitFunc func(admissionv1.AdmissionReview) *admissionv1.AdmissionResponse
type AdmissionServiceConfig struct {
- SchedulerName string
- KubeClient kubernetes.Interface
- VolcanoClient versioned.Interface
- Recorder record.EventRecorder
- ConfigData *config.AdmissionConfiguration
+ SchedulerNames []string
+ KubeClient kubernetes.Interface
+ VolcanoClient versioned.Interface
+ Recorder record.EventRecorder
+ ConfigData *config.AdmissionConfiguration
}
type AdmissionService struct {
diff --git a/test/e2e/jobseq/pytorch_plugin.go b/test/e2e/jobseq/pytorch_plugin.go
new file mode 100644
index 00000000000..a1b7f37607d
--- /dev/null
+++ b/test/e2e/jobseq/pytorch_plugin.go
@@ -0,0 +1,58 @@
+package jobseq
+
+import (
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
+ vcbus "volcano.sh/apis/pkg/apis/bus/v1alpha1"
+ e2eutil "volcano.sh/volcano/test/e2e/util"
+)
+
+var _ = Describe("Pytorch Plugin E2E Test", func() {
+ It("will run and complete finally", func() {
+ context := e2eutil.InitTestContext(e2eutil.Options{})
+ defer e2eutil.CleanupTestContext(context)
+
+ slot := e2eutil.OneCPU
+
+ spec := &e2eutil.JobSpec{
+ Name: "pytorch-job",
+ Min: 1,
+ Policies: []vcbatch.LifecyclePolicy{
+ {
+ Action: vcbus.CompleteJobAction,
+ Event: vcbus.TaskCompletedEvent,
+ },
+ },
+ Plugins: map[string][]string{
+ "pytorch": {"--master=master", "--worker=worker", "--port=23456"},
+ },
+ Tasks: []e2eutil.TaskSpec{
+ {
+ Name: "master",
+ Img: "docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727",
+ Req: slot,
+ Min: 1,
+ Rep: 1,
+ WorkingDir: "/home",
+ // Need sometime waiting for worker node ready
+ Command: `python3 /opt/pytorch-mnist/mnist.py --epochs=1`,
+ },
+ {
+ Name: "worker",
+ Img: "docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727",
+ Req: slot,
+ Min: 2,
+ Rep: 2,
+ WorkingDir: "/home",
+ Command: "python3 /opt/pytorch-mnist/mnist.py --epochs=1",
+ },
+ },
+ }
+
+ job := e2eutil.CreateJob(context, spec)
+ err := e2eutil.WaitJobPhases(context, job, []vcbatch.JobPhase{
+ vcbatch.Pending, vcbatch.Running, vcbatch.Completed})
+ Expect(err).NotTo(HaveOccurred())
+ })
+})
diff --git a/vendor/k8s.io/client-go/util/retry/OWNERS b/vendor/k8s.io/client-go/util/retry/OWNERS
new file mode 100644
index 00000000000..dec3e88d631
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/retry/OWNERS
@@ -0,0 +1,4 @@
+# See the OWNERS docs at https://go.k8s.io/owners
+
+reviewers:
+- caesarxuchao
diff --git a/vendor/k8s.io/client-go/util/retry/util.go b/vendor/k8s.io/client-go/util/retry/util.go
new file mode 100644
index 00000000000..772f5bd7a77
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/retry/util.go
@@ -0,0 +1,105 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package retry
+
+import (
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/util/wait"
+)
+
+// DefaultRetry is the recommended retry for a conflict where multiple clients
+// are making changes to the same resource.
+var DefaultRetry = wait.Backoff{
+ Steps: 5,
+ Duration: 10 * time.Millisecond,
+ Factor: 1.0,
+ Jitter: 0.1,
+}
+
+// DefaultBackoff is the recommended backoff for a conflict where a client
+// may be attempting to make an unrelated modification to a resource under
+// active management by one or more controllers.
+var DefaultBackoff = wait.Backoff{
+ Steps: 4,
+ Duration: 10 * time.Millisecond,
+ Factor: 5.0,
+ Jitter: 0.1,
+}
+
+// OnError allows the caller to retry fn in case the error returned by fn is retriable
+// according to the provided function. backoff defines the maximum retries and the wait
+// interval between two retries.
+func OnError(backoff wait.Backoff, retriable func(error) bool, fn func() error) error {
+ var lastErr error
+ err := wait.ExponentialBackoff(backoff, func() (bool, error) {
+ err := fn()
+ switch {
+ case err == nil:
+ return true, nil
+ case retriable(err):
+ lastErr = err
+ return false, nil
+ default:
+ return false, err
+ }
+ })
+ if err == wait.ErrWaitTimeout {
+ err = lastErr
+ }
+ return err
+}
+
+// RetryOnConflict is used to make an update to a resource when you have to worry about
+// conflicts caused by other code making unrelated updates to the resource at the same
+// time. fn should fetch the resource to be modified, make appropriate changes to it, try
+// to update it, and return (unmodified) the error from the update function. On a
+// successful update, RetryOnConflict will return nil. If the update function returns a
+// "Conflict" error, RetryOnConflict will wait some amount of time as described by
+// backoff, and then try again. On a non-"Conflict" error, or if it retries too many times
+// and gives up, RetryOnConflict will return an error to the caller.
+//
+// err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
+// // Fetch the resource here; you need to refetch it on every try, since
+// // if you got a conflict on the last update attempt then you need to get
+// // the current version before making your own changes.
+// pod, err := c.Pods("mynamespace").Get(name, metav1.GetOptions{})
+// if err != nil {
+// return err
+// }
+//
+// // Make whatever updates to the resource are needed
+// pod.Status.Phase = v1.PodFailed
+//
+// // Try to update
+// _, err = c.Pods("mynamespace").UpdateStatus(pod)
+// // You have to return err itself here (not wrapped inside another error)
+// // so that RetryOnConflict can identify it correctly.
+// return err
+// })
+// if err != nil {
+// // May be conflict if max retries were hit, or may be something unrelated
+// // like permissions or a network error
+// return err
+// }
+// ...
+//
+// TODO: Make Backoff an interface?
+func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
+ return OnError(backoff, errors.IsConflict, fn)
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 10e2edc34e8..32ecdcfaccd 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -857,6 +857,7 @@ k8s.io/client-go/util/flowcontrol
k8s.io/client-go/util/homedir
k8s.io/client-go/util/jsonpath
k8s.io/client-go/util/keyutil
+k8s.io/client-go/util/retry
k8s.io/client-go/util/workqueue
# k8s.io/cloud-provider v0.23.0 => k8s.io/cloud-provider v0.23.0
## explicit; go 1.16