Skip to content

Commit

Permalink
Merge pull request #860 from hzxuzhonghu/for-gpu
Browse files Browse the repository at this point in the history
Add kubeClient in scheduler framwork.Session
  • Loading branch information
volcano-sh-bot authored Jun 14, 2020
2 parents 758eb4b + 9a19ea4 commit 2db93ac
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
31 changes: 18 additions & 13 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func New(config *rest.Config, schedulerName string, defaultQueue string) Cache {
return newSchedulerCache(config, schedulerName, defaultQueue)
}

//SchedulerCache cache for the kube batch
// SchedulerCache cache for the kube batch
type SchedulerCache struct {
sync.Mutex

kubeclient *kubernetes.Clientset
vcclient *vcclient.Clientset
kubeClient *kubernetes.Clientset
vcClient *vcclient.Clientset

defaultQueue string
// schedulerName is the name for kube batch scheduler
// schedulerName is the name for volcano scheduler
schedulerName string

podInformer infov1.PodInformer
Expand Down Expand Up @@ -286,8 +286,8 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
PriorityClasses: make(map[string]*v1beta1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubeClient,
vcclient: vcClient,
kubeClient: kubeClient,
vcClient: vcClient,
defaultQueue: defaultQueue,
schedulerName: schedulerName,

Expand All @@ -300,27 +300,27 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})

sc.Binder = &defaultBinder{
kubeclient: sc.kubeclient,
kubeclient: sc.kubeClient,
}

sc.Evictor = &defaultEvictor{
kubeclient: sc.kubeclient,
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
}

sc.StatusUpdater = &defaultStatusUpdater{
kubeclient: sc.kubeclient,
vcclient: sc.vcclient,
kubeclient: sc.kubeClient,
vcclient: sc.vcClient,
}

informerFactory := informers.NewSharedInformerFactory(sc.kubeclient, 0)
informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0)

sc.pvcInformer = informerFactory.Core().V1().PersistentVolumeClaims()
sc.pvInformer = informerFactory.Core().V1().PersistentVolumes()
sc.scInformer = informerFactory.Storage().V1().StorageClasses()
sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
sc.kubeclient,
sc.kubeClient,
sc.nodeInformer,
nil,
sc.pvcInformer,
Expand Down Expand Up @@ -379,7 +379,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
DeleteFunc: sc.DeleteResourceQuota,
})

vcinformers := vcinformer.NewSharedInformerFactory(sc.vcclient, 0)
vcinformers := vcinformer.NewSharedInformerFactory(sc.vcClient, 0)

// create informer for PodGroup(v1beta1) information
sc.podGroupInformerV1beta1 = vcinformers.Scheduling().V1beta1().PodGroups()
Expand Down Expand Up @@ -592,6 +592,11 @@ func (sc *SchedulerCache) BindVolumes(task *schedulingapi.TaskInfo) error {
return sc.VolumeBinder.BindVolumes(task)
}

// Client returns the kubernetes clientSet
func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
}

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, message string) error {
pod := task.Pod
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
}

func (sc *SchedulerCache) syncTask(oldTask *schedulingapi.TaskInfo) error {
newPod, err := sc.kubeclient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
newPod, err := sc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
err := sc.deleteTask(oldTask)
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"volcano.sh/volcano/pkg/scheduler/api"
)
Expand Down Expand Up @@ -53,6 +54,9 @@ type Cache interface {

// BindVolumes binds volumes to the task
BindVolumes(task *api.TaskInfo) error

// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface
}

// VolumeBinder interface for allocate and bind volumes
Expand Down
9 changes: 6 additions & 3 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"volcano.sh/volcano/pkg/apis/scheduling"
Expand All @@ -36,7 +37,8 @@ import (
type Session struct {
UID types.UID

cache cache.Cache
kubeClient kubernetes.Interface
cache cache.Cache

podGroupStatus map[api.JobID]*scheduling.PodGroupStatus

Expand Down Expand Up @@ -72,8 +74,9 @@ type Session struct {

func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
cache: cache,
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
cache: cache,

podGroupStatus: map[api.JobID]*scheduling.PodGroupStatus{},

Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
cache schedcache.Cache
config *rest.Config
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
Expand All @@ -45,13 +44,12 @@ type Scheduler struct {
func NewScheduler(
config *rest.Config,
schedulerName string,
conf string,
schedulerConf string,
period time.Duration,
defaultQueue string,
) (*Scheduler, error) {
scheduler := &Scheduler{
config: config,
schedulerConf: conf,
schedulerConf: schedulerConf,
cache: schedcache.New(config, schedulerName, defaultQueue),
schedulePeriod: period,
}
Expand Down

0 comments on commit 2db93ac

Please sign in to comment.