Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kubeClient in scheduler framwork.Session #860

Merged
merged 1 commit into from
Jun 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func New(config *rest.Config, schedulerName string, defaultQueue string) Cache {
return newSchedulerCache(config, schedulerName, defaultQueue)
}

//SchedulerCache cache for the kube batch
// SchedulerCache cache for the kube batch
type SchedulerCache struct {
sync.Mutex

kubeclient *kubernetes.Clientset
vcclient *vcclient.Clientset
kubeClient *kubernetes.Clientset
vcClient *vcclient.Clientset

defaultQueue string
// schedulerName is the name for kube batch scheduler
// schedulerName is the name for volcano scheduler
schedulerName string

podInformer infov1.PodInformer
Expand Down Expand Up @@ -286,8 +286,8 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
PriorityClasses: make(map[string]*v1beta1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubeClient,
vcclient: vcClient,
kubeClient: kubeClient,
vcClient: vcClient,
defaultQueue: defaultQueue,
schedulerName: schedulerName,

Expand All @@ -300,27 +300,27 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})

sc.Binder = &defaultBinder{
kubeclient: sc.kubeclient,
kubeclient: sc.kubeClient,
}

sc.Evictor = &defaultEvictor{
kubeclient: sc.kubeclient,
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
}

sc.StatusUpdater = &defaultStatusUpdater{
kubeclient: sc.kubeclient,
vcclient: sc.vcclient,
kubeclient: sc.kubeClient,
vcclient: sc.vcClient,
}

informerFactory := informers.NewSharedInformerFactory(sc.kubeclient, 0)
informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0)

sc.pvcInformer = informerFactory.Core().V1().PersistentVolumeClaims()
sc.pvInformer = informerFactory.Core().V1().PersistentVolumes()
sc.scInformer = informerFactory.Storage().V1().StorageClasses()
sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
sc.kubeclient,
sc.kubeClient,
sc.nodeInformer,
nil,
sc.pvcInformer,
Expand Down Expand Up @@ -379,7 +379,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
DeleteFunc: sc.DeleteResourceQuota,
})

vcinformers := vcinformer.NewSharedInformerFactory(sc.vcclient, 0)
vcinformers := vcinformer.NewSharedInformerFactory(sc.vcClient, 0)

// create informer for PodGroup(v1beta1) information
sc.podGroupInformerV1beta1 = vcinformers.Scheduling().V1beta1().PodGroups()
Expand Down Expand Up @@ -592,6 +592,11 @@ func (sc *SchedulerCache) BindVolumes(task *schedulingapi.TaskInfo) error {
return sc.VolumeBinder.BindVolumes(task)
}

// Client returns the kubernetes clientSet
func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
}

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, message string) error {
pod := task.Pod
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
}

func (sc *SchedulerCache) syncTask(oldTask *schedulingapi.TaskInfo) error {
newPod, err := sc.kubeclient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
newPod, err := sc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
err := sc.deleteTask(oldTask)
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cache

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"volcano.sh/volcano/pkg/scheduler/api"
)
Expand Down Expand Up @@ -53,6 +54,9 @@ type Cache interface {

// BindVolumes binds volumes to the task
BindVolumes(task *api.TaskInfo) error

// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface
}

// VolumeBinder interface for allocate and bind volumes
Expand Down
9 changes: 6 additions & 3 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"volcano.sh/volcano/pkg/apis/scheduling"
Expand All @@ -36,7 +37,8 @@ import (
type Session struct {
UID types.UID

cache cache.Cache
kubeClient kubernetes.Interface
cache cache.Cache

podGroupStatus map[api.JobID]*scheduling.PodGroupStatus

Expand Down Expand Up @@ -72,8 +74,9 @@ type Session struct {

func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
cache: cache,
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
cache: cache,

podGroupStatus: map[api.JobID]*scheduling.PodGroupStatus{},

Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
cache schedcache.Cache
config *rest.Config
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
Expand All @@ -45,13 +44,12 @@ type Scheduler struct {
func NewScheduler(
config *rest.Config,
schedulerName string,
conf string,
schedulerConf string,
period time.Duration,
defaultQueue string,
) (*Scheduler, error) {
scheduler := &Scheduler{
config: config,
schedulerConf: conf,
schedulerConf: schedulerConf,
cache: schedcache.New(config, schedulerName, defaultQueue),
schedulePeriod: period,
}
Expand Down