Skip to content

Commit

Permalink
Fix busy but healthy job from getting lost (#401)
Browse files Browse the repository at this point in the history
Co-authored-by: Yunus Olgun <[email protected]>
  • Loading branch information
yolgun and yolgun authored May 16, 2022
1 parent 22565d5 commit 29c37b5
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,25 +384,29 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedCl
flinkJobsUnexpected = append(flinkJobsUnexpected, job.Id)
}
}

flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID)
if err != nil {
// It is normal in many cases, not an error.
log.Info("Failed to get Flink job exceptions.", "error", err)
return
}
log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions)
flinkJob.exceptions = flinkJobExceptions

flinkJob.status = flinkJobStatus
flinkJob.unexpected = flinkJobsUnexpected

log.Info("Observed Flink job",
"submitted job status", flinkJob.status,
"all job list", flinkJob.list,
"unexpected job list", flinkJob.unexpected)
if len(flinkJobsUnexpected) > 0 {
log.Info("More than one unexpected Flink job were found!")
}

if flinkJobID == "" {
log.Info("No flinkJobID given. Skipping get exceptions")
} else {
flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID)
if err != nil {
// It is normal in many cases, not an error.
log.Info("Failed to get Flink job exceptions.", "error", err)
} else {
log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions)
flinkJob.exceptions = flinkJobExceptions
}
}
}

func (observer *ClusterStateObserver) observeSavepoint(cluster *v1beta1.FlinkCluster, savepoint *Savepoint) error {
Expand Down

0 comments on commit 29c37b5

Please sign in to comment.