Skip to content

Commit

Permalink
Remove volcano PodGroup when cluster is tearing down (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jan 20, 2022
1 parent 309d334 commit ebc1255
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,4 @@ rules:
- get
- create
- update
- delete
24 changes: 20 additions & 4 deletions controllers/flinkcluster/batchscheduler/volcano/volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ func (v *VolcanoBatchScheduler) Name() string {

// Schedule reconciles batch scheduling
func (v *VolcanoBatchScheduler) Schedule(cluster *v1beta1.FlinkCluster, state *model.DesiredClusterState) error {
res, size := getClusterResource(state)
pg, err := v.syncPodGroup(cluster, size, res)
pg, err := v.syncPodGroup(cluster, state)
if err != nil {
return err
}
v.setSchedulerMeta(pg, state)

if pg != nil {
v.setSchedulerMeta(pg, state)
}
return nil
}

Expand Down Expand Up @@ -149,7 +151,21 @@ func (v *VolcanoBatchScheduler) updatePodGroup(pg *scheduling.PodGroup) (*schedu
Update(context.TODO(), pg, metav1.UpdateOptions{})
}

func (v *VolcanoBatchScheduler) syncPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) {
func (v *VolcanoBatchScheduler) deletePodGroup(cluster *v1beta1.FlinkCluster) error {
podGroupName := v.getPodGroupName(cluster)
return v.volcanoClient.
SchedulingV1beta1().
PodGroups(cluster.Namespace).
Delete(context.TODO(), podGroupName, metav1.DeleteOptions{})
}

func (v *VolcanoBatchScheduler) syncPodGroup(cluster *v1beta1.FlinkCluster, state *model.DesiredClusterState) (*scheduling.PodGroup, error) {
if state.JmStatefulSet == nil && state.TmStatefulSet == nil {
// remove the podgroup if the JobManager/TaskManager statefulset are not set
return nil, v.deletePodGroup(cluster)
}

minResource, size := getClusterResource(state)
pg, err := v.getPodGroup(cluster)
if err != nil {
if !errors.IsNotFound(err) {
Expand Down
6 changes: 3 additions & 3 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
}

func (reconciler *ClusterReconciler) reconcileBatchScheduler() error {
if reconciler.observed.cluster.Spec.BatchScheduler == nil ||
reconciler.observed.cluster.Spec.BatchScheduler.Name == "" {
schedulerSpec := reconciler.observed.cluster.Spec.BatchScheduler
if schedulerSpec == nil || schedulerSpec.Name == "" {
return nil
}

scheduler, err := batchscheduler.GetScheduler(reconciler.observed.cluster.Spec.BatchScheduler.Name)
scheduler, err := batchscheduler.GetScheduler(schedulerSpec.Name)
if err != nil {
return err
}
Expand Down

0 comments on commit ebc1255

Please sign in to comment.