From 29c37b5d13f29a1a51a2a657c917eb565faddc93 Mon Sep 17 00:00:00 2001 From: yolgun Date: Mon, 16 May 2022 17:30:00 +0200 Subject: [PATCH] Fix busy but healthy job from getting lost (#401) Co-authored-by: Yunus Olgun --- .../flinkcluster/flinkcluster_observer.go | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_observer.go b/controllers/flinkcluster/flinkcluster_observer.go index 3b483ced..4b670567 100644 --- a/controllers/flinkcluster/flinkcluster_observer.go +++ b/controllers/flinkcluster/flinkcluster_observer.go @@ -384,18 +384,9 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedCl flinkJobsUnexpected = append(flinkJobsUnexpected, job.Id) } } - - flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID) - if err != nil { - // It is normal in many cases, not an error. - log.Info("Failed to get Flink job exceptions.", "error", err) - return - } - log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions) - flinkJob.exceptions = flinkJobExceptions - flinkJob.status = flinkJobStatus flinkJob.unexpected = flinkJobsUnexpected + log.Info("Observed Flink job", "submitted job status", flinkJob.status, "all job list", flinkJob.list, @@ -403,6 +394,19 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedCl if len(flinkJobsUnexpected) > 0 { log.Info("More than one unexpected Flink job were found!") } + + if flinkJobID == "" { + log.Info("No flinkJobID given. Skipping get exceptions") + } else { + flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID) + if err != nil { + // It is normal in many cases, not an error. + log.Info("Failed to get Flink job exceptions.", "error", err) + } else { + log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions) + flinkJob.exceptions = flinkJobExceptions + } + } } func (observer *ClusterStateObserver) observeSavepoint(cluster *v1beta1.FlinkCluster, savepoint *Savepoint) error {