From 9a19ea4004d4c803eaecbea517b1cef79751168f Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Fri, 12 Jun 2020 15:41:11 +0800 Subject: [PATCH] Add kubeClient in scheduler framwork.Session --- pkg/scheduler/cache/cache.go | 31 ++++++++++++++++----------- pkg/scheduler/cache/event_handlers.go | 2 +- pkg/scheduler/cache/interface.go | 4 ++++ pkg/scheduler/framework/session.go | 9 +++++--- pkg/scheduler/scheduler.go | 6 ++---- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 9a2c9ae0bd..79abe84ad7 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -67,15 +67,15 @@ func New(config *rest.Config, schedulerName string, defaultQueue string) Cache { return newSchedulerCache(config, schedulerName, defaultQueue) } -//SchedulerCache cache for the kube batch +// SchedulerCache cache for the kube batch type SchedulerCache struct { sync.Mutex - kubeclient *kubernetes.Clientset - vcclient *vcclient.Clientset + kubeClient *kubernetes.Clientset + vcClient *vcclient.Clientset defaultQueue string - // schedulerName is the name for kube batch scheduler + // schedulerName is the name for volcano scheduler schedulerName string podInformer infov1.PodInformer @@ -286,8 +286,8 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s PriorityClasses: make(map[string]*v1beta1.PriorityClass), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - kubeclient: kubeClient, - vcclient: vcClient, + kubeClient: kubeClient, + vcClient: vcClient, defaultQueue: defaultQueue, schedulerName: schedulerName, @@ -300,27 +300,27 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName}) sc.Binder = &defaultBinder{ - kubeclient: sc.kubeclient, + kubeclient: sc.kubeClient, } sc.Evictor = &defaultEvictor{ - kubeclient: sc.kubeclient, + kubeclient: sc.kubeClient, recorder: sc.Recorder, } sc.StatusUpdater = &defaultStatusUpdater{ - kubeclient: sc.kubeclient, - vcclient: sc.vcclient, + kubeclient: sc.kubeClient, + vcclient: sc.vcClient, } - informerFactory := informers.NewSharedInformerFactory(sc.kubeclient, 0) + informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0) sc.pvcInformer = informerFactory.Core().V1().PersistentVolumeClaims() sc.pvInformer = informerFactory.Core().V1().PersistentVolumes() sc.scInformer = informerFactory.Storage().V1().StorageClasses() sc.VolumeBinder = &defaultVolumeBinder{ volumeBinder: volumescheduling.NewVolumeBinder( - sc.kubeclient, + sc.kubeClient, sc.nodeInformer, nil, sc.pvcInformer, @@ -379,7 +379,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s DeleteFunc: sc.DeleteResourceQuota, }) - vcinformers := vcinformer.NewSharedInformerFactory(sc.vcclient, 0) + vcinformers := vcinformer.NewSharedInformerFactory(sc.vcClient, 0) // create informer for PodGroup(v1beta1) information sc.podGroupInformerV1beta1 = vcinformers.Scheduling().V1beta1().PodGroups() @@ -592,6 +592,11 @@ func (sc *SchedulerCache) BindVolumes(task *schedulingapi.TaskInfo) error { return sc.VolumeBinder.BindVolumes(task) } +// Client returns the kubernetes clientSet +func (sc *SchedulerCache) Client() kubernetes.Interface { + return sc.kubeClient +} + // taskUnschedulable updates pod status of pending task func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, message string) error { pod := task.Pod diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 7c87265213..5aba6f7878 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -84,7 +84,7 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error { } func (sc *SchedulerCache) syncTask(oldTask *schedulingapi.TaskInfo) error { - newPod, err := sc.kubeclient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{}) + newPod, err := sc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { err := sc.deleteTask(oldTask) diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 4d52b05987..c06d27667f 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -18,6 +18,7 @@ package cache import ( v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -53,6 +54,9 @@ type Cache interface { // BindVolumes binds volumes to the task BindVolumes(task *api.TaskInfo) error + + // Client returns the kubernetes clientSet, which can be used by plugins + Client() kubernetes.Interface } // VolumeBinder interface for allocate and bind volumes diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 50a5bfc3bd..10064f9b56 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/kubernetes" "k8s.io/klog" "volcano.sh/volcano/pkg/apis/scheduling" @@ -36,7 +37,8 @@ import ( type Session struct { UID types.UID - cache cache.Cache + kubeClient kubernetes.Interface + cache cache.Cache podGroupStatus map[api.JobID]*scheduling.PodGroupStatus @@ -72,8 +74,9 @@ type Session struct { func openSession(cache cache.Cache) *Session { ssn := &Session{ - UID: uuid.NewUUID(), - cache: cache, + UID: uuid.NewUUID(), + kubeClient: cache.Client(), + cache: cache, podGroupStatus: map[api.JobID]*scheduling.PodGroupStatus{}, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 56c49b3869..7dae703a14 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -33,7 +33,6 @@ import ( // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { cache schedcache.Cache - config *rest.Config actions []framework.Action plugins []conf.Tier configurations []conf.Configuration @@ -45,13 +44,12 @@ type Scheduler struct { func NewScheduler( config *rest.Config, schedulerName string, - conf string, + schedulerConf string, period time.Duration, defaultQueue string, ) (*Scheduler, error) { scheduler := &Scheduler{ - config: config, - schedulerConf: conf, + schedulerConf: schedulerConf, cache: schedcache.New(config, schedulerName, defaultQueue), schedulePeriod: period, }