Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Nathan Abellard <[email protected]>
  • Loading branch information
jabellard committed Sep 9, 2024
1 parent ab0dfc5 commit 1a33436
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
15 changes: 13 additions & 2 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,12 @@ func (d *Descheduler) establishEstimatorConnections() {
return
}
for i := range clusterList.Items {
if err = estimatorclient.EstablishConnection(d.KubeClient, clusterList.Items[i].Name, d.schedulerEstimatorCache, d.schedulerEstimatorServiceNamespace, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig); err != nil {
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: clusterList.Items[i].Name,
Namespace: d.schedulerEstimatorServiceNamespace,
NamePrefix: d.schedulerEstimatorServicePrefix,
}
if err = estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig); err != nil {
klog.Error(err)
}
}
Expand All @@ -306,7 +311,13 @@ func (d *Descheduler) reconcileEstimatorConnection(key util.QueueKey) error {
}
return err
}
return estimatorclient.EstablishConnection(d.KubeClient, name, d.schedulerEstimatorCache, d.schedulerEstimatorServiceNamespace, d.schedulerEstimatorServicePrefix, d.schedulerEstimatorClientConfig)
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: name,
Namespace: d.schedulerEstimatorServiceNamespace,
NamePrefix: d.schedulerEstimatorServicePrefix,
}
return estimatorclient.EstablishConnection(d.KubeClient, serviceInfo, d.schedulerEstimatorCache, d.schedulerEstimatorClientConfig)

}

func (d *Descheduler) recordDescheduleResultEventForResourceBinding(rb *workv1alpha2.ResourceBinding, message string, err error) {
Expand Down
23 changes: 15 additions & 8 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type SchedulerEstimatorCache struct {
estimator map[string]*clientWrapper
}

// SchedulerEstimatorServiceInfo contains information needed to discover and connect to a scheduler estimator service.
type SchedulerEstimatorServiceInfo struct {
Name string
NamePrefix string
Namespace string
}

// NewSchedulerEstimatorCache returns an accurate scheduler estimator cache.
func NewSchedulerEstimatorCache() *SchedulerEstimatorCache {
return &SchedulerEstimatorCache{
Expand Down Expand Up @@ -96,25 +103,25 @@ func (c *SchedulerEstimatorCache) GetClient(name string) (estimatorservice.Estim
}

// EstablishConnection establishes a new gRPC connection with the specified cluster scheduler estimator.
func EstablishConnection(kubeClient kubernetes.Interface, name string, estimatorCache *SchedulerEstimatorCache, estimatorServiceNamespace string, estimatorServicePrefix string, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(name) {
func EstablishConnection(kubeClient kubernetes.Interface, serviceInfo SchedulerEstimatorServiceInfo, estimatorCache *SchedulerEstimatorCache, grpcConfig *grpcconnection.ClientConfig) error {
if estimatorCache.IsEstimatorExist(serviceInfo.Name) {
return nil
}

serverAddr, err := resolveCluster(kubeClient, estimatorServiceNamespace,
names.GenerateEstimatorServiceName(estimatorServicePrefix, name), int32(grpcConfig.TargetPort))
serverAddr, err := resolveCluster(kubeClient, serviceInfo.Namespace,
names.GenerateEstimatorServiceName(serviceInfo.NamePrefix, serviceInfo.Name), int32(grpcConfig.TargetPort))
if err != nil {
return err
}

klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, name)
klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, serviceInfo.Name)
cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second)
if err != nil {
klog.Errorf("Failed to dial cluster(%s): %v.", name, err)
klog.Errorf("Failed to dial cluster(%s): %v.", serviceInfo.Name, err)
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, name)
estimatorCache.AddCluster(serviceInfo.Name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, serviceInfo.Name)
return nil
}
15 changes: 13 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,13 @@ func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
return nil
}

return estimatorclient.EstablishConnection(s.KubeClient, name, s.schedulerEstimatorCache, s.schedulerEstimatorServiceNamespace, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig)
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: name,
Namespace: s.schedulerEstimatorServiceNamespace,
NamePrefix: s.schedulerEstimatorServicePrefix,
}
return estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig)

}

func (s *Scheduler) establishEstimatorConnections() {
Expand All @@ -800,7 +806,12 @@ func (s *Scheduler) establishEstimatorConnections() {
if clusterList.Items[i].Spec.SyncMode == clusterv1alpha1.Pull && s.disableSchedulerEstimatorInPullMode {
continue
}
if err = estimatorclient.EstablishConnection(s.KubeClient, clusterList.Items[i].Name, s.schedulerEstimatorCache, s.schedulerEstimatorServiceNamespace, s.schedulerEstimatorServicePrefix, s.schedulerEstimatorClientConfig); err != nil {
serviceInfo := estimatorclient.SchedulerEstimatorServiceInfo{
Name: clusterList.Items[i].Name,
Namespace: s.schedulerEstimatorServiceNamespace,
NamePrefix: s.schedulerEstimatorServicePrefix,
}
if err = estimatorclient.EstablishConnection(s.KubeClient, serviceInfo, s.schedulerEstimatorCache, s.schedulerEstimatorClientConfig); err != nil {
klog.Error(err)
}
}
Expand Down

0 comments on commit 1a33436

Please sign in to comment.