Skip to content

Commit

Permalink
Fix: Wait until the submitter finishes before updating job state (#619)
Browse files Browse the repository at this point in the history
Co-authored-by: Yunus Olgun <[email protected]>
  • Loading branch information
yolgun and yolgun authored Feb 1, 2023
1 parent 0a1e0b5 commit 2dd656f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
4 changes: 2 additions & 2 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ func (observer *ClusterStateObserver) observeJob(
}

var submitterLog *SubmitterLog
// Extract submission result only when it is in deployment progress.
// Extract submission result only when it is in deployment progress or the submitter pod failed.
// It is not necessary to get the log stream from the submitter pod always.
var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
if jobPod != nil && jobDeployInProgress {
if jobPod != nil && (jobDeployInProgress || jobPod.Status.Phase == corev1.PodFailed) {
var err error
submitterLog, err = getFlinkJobSubmitLog(observer.k8sClientset, jobPod)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions controllers/flinkcluster/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,15 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
newJobState = v1beta1.JobStateDeploying
// Derive the job state from the observed Flink job, if it exists.
case observedFlinkJob != nil:
newJobState = getFlinkJobDeploymentState(observedFlinkJob.State)
newJob.ID = observedFlinkJob.Id
newJob.Name = observedFlinkJob.Name
tmpState := getFlinkJobDeploymentState(observedFlinkJob.State)
if observedSubmitter.job == nil || tmpState != v1beta1.JobStateSucceeded {
newJobState = tmpState
break
}
updater.log.Info("The submitter maybe still running. Waiting for it")
fallthrough
case oldJob.IsActive() && observedSubmitter.job != nil && observedSubmitter.job.Status.Active == 0:
if observedSubmitter.job.Status.Succeeded == 1 {
newJobState = v1beta1.JobStateSucceeded
Expand Down Expand Up @@ -725,8 +731,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
for _, e := range exceptions.Exceptions {
newJob.FailureReasons = append(newJob.FailureReasons, e.Exception)
}
}
if observedSubmitter.log != nil {
} else if observedSubmitter.log != nil {
newJob.FailureReasons = append(newJob.FailureReasons, observedSubmitter.log.message)
}
}
Expand Down

0 comments on commit 2dd656f

Please sign in to comment.