Skip to content

Commit

Permalink
Revert "Skip validation when job-cancel control applied" (#610)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jan 23, 2023
1 parent d773bcb commit 41e5bea
Showing 1 changed file with 11 additions and 15 deletions.
26 changes: 11 additions & 15 deletions apis/flinkcluster/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (v *Validator) ValidateCreate(cluster *FlinkCluster) error {
// ValidateUpdate validates update request.
func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error {
var err error
controlName, err := v.checkControlAnnotations(old, new)
err = v.checkControlAnnotations(old, new)
if err != nil {
return err
}
Expand All @@ -101,7 +101,7 @@ func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error {
if err != nil {
return err
}
if cancelRequested || controlName == ControlNameJobCancel {
if cancelRequested {
return nil
}

Expand Down Expand Up @@ -131,39 +131,35 @@ func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error {
return nil
}

func (v *Validator) checkControlAnnotations(old *FlinkCluster, new *FlinkCluster) (string, error) {
func (v *Validator) checkControlAnnotations(old *FlinkCluster, new *FlinkCluster) error {
oldUserControl := old.Annotations[ControlAnnotation]
newUserControl, ok := new.Annotations[ControlAnnotation]
if ok {
if oldUserControl != newUserControl && old.Status.Control != nil && old.Status.Control.State == ControlStateInProgress {
return "", fmt.Errorf(ControlChangeWarnMsg, ControlAnnotation)
return fmt.Errorf(ControlChangeWarnMsg, ControlAnnotation)
}
switch newUserControl {
case ControlNameJobCancel:
var job = old.Status.Components.Job
if old.Spec.Job == nil {
return "", fmt.Errorf(SessionClusterWarnMsg, ControlNameJobCancel, ControlAnnotation)
return fmt.Errorf(SessionClusterWarnMsg, ControlNameJobCancel, ControlAnnotation)
} else if job == nil || job.IsTerminated(old.Spec.Job) {
return "", errors.NewResourceExpired(fmt.Sprintf(InvalidJobStateForJobCancelMsg, ControlAnnotation))
} else {
return ControlNameJobCancel, nil
return errors.NewResourceExpired(fmt.Sprintf(InvalidJobStateForJobCancelMsg, ControlAnnotation))
}
case ControlNameSavepoint:
var job = old.Status.Components.Job
if old.Spec.Job == nil {
return "", fmt.Errorf(SessionClusterWarnMsg, ControlNameSavepoint, ControlAnnotation)
return fmt.Errorf(SessionClusterWarnMsg, ControlNameSavepoint, ControlAnnotation)
} else if old.Spec.Job.SavepointsDir == nil || *old.Spec.Job.SavepointsDir == "" {
return "", fmt.Errorf(InvalidSavepointDirMsg, ControlAnnotation)
return fmt.Errorf(InvalidSavepointDirMsg, ControlAnnotation)
} else if job == nil || job.IsStopped() {
return "", fmt.Errorf(InvalidJobStateForSavepointMsg, ControlAnnotation)
} else {
return ControlNameSavepoint, nil
return fmt.Errorf(InvalidJobStateForSavepointMsg, ControlAnnotation)
}
default:
return "", fmt.Errorf(InvalidControlAnnMsg, ControlAnnotation, newUserControl)
return fmt.Errorf(InvalidControlAnnMsg, ControlAnnotation, newUserControl)
}
}
return "", nil
return nil
}

func (v *Validator) checkCancelRequested(
Expand Down

0 comments on commit 41e5bea

Please sign in to comment.