From e0c960a69a0840dd81a32ea86694815228c18ec9 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 22 Feb 2022 09:19:22 +0000 Subject: [PATCH] Small rework around job observer --- .../flinkcluster/flinkcluster_observer.go | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_observer.go b/controllers/flinkcluster/flinkcluster_observer.go index eab93c92..e69b1c15 100644 --- a/controllers/flinkcluster/flinkcluster_observer.go +++ b/controllers/flinkcluster/flinkcluster_observer.go @@ -268,38 +268,52 @@ func (observer *ClusterStateObserver) observeJob( return nil } var log = observer.log - var recorded = observed.cluster.Status - var err error + // Extract the log stream from pod only when the job state is Deploying. + var recordedJob = observed.cluster.Status.Components.Job + + // Extract submission result only when it is in deployment progress. + // It is not necessary to get the log stream from the submitter pod always. + var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying + if !jobDeployInProgress { + return nil + } + + var jobName = getJobName(observed.cluster.Name) // Observe the Flink job submitter. var submitter FlinkJobSubmitter - - // Extract the log stream from pod only when the job state is Deploying. - var recordedJob = recorded.Components.Job - err = observer.observeSubmitter(recordedJob, &submitter) - if err != nil { + if err := observer.observeSubmitter(jobName, &submitter); err != nil { log.Error(err, "Failed to get the status of the job submitter") } observed.flinkJobSubmitter = submitter // Observe the Flink job status. var flinkJobID string + if jobId, ok := submitter.pod.Labels["job-id"]; ok { + flinkJobID = jobId + } else // Get the ID from the job submitter. if submitter.log != nil && submitter.log.jobID != "" { flinkJobID = submitter.log.jobID } else // Or get the job ID from the recorded job status which is written in previous iteration. - if recorded.Components.Job != nil { - flinkJobID = recorded.Components.Job.ID + if recordedJob != nil { + flinkJobID = recordedJob.ID } + + // Wait until the job manager is ready. var observedFlinkJob FlinkJob - observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJob) + // Wait until the job manager is ready. + jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady + if jmReady { + observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJob) + } observed.flinkJob = observedFlinkJob return nil } -func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobStatus, submitter *FlinkJobSubmitter) error { +func (observer *ClusterStateObserver) observeSubmitter(jobName string, submitter *FlinkJobSubmitter) error { var log = observer.log var err error @@ -310,7 +324,7 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS // Job resource. job = new(batchv1.Job) - err = observer.observeJobSubmitter(job) + err = observer.observeJobSubmitter(jobName, job) if err != nil { if client.IgnoreNotFound(err) != nil { log.Error(err, "Failed to get the job submitter") @@ -331,7 +345,7 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS } // Get job submitter pod resource. pod = new(corev1.Pod) - err = observer.observeJobSubmitterPod(pod) + err = observer.observeJobSubmitterPod(jobName, pod) if err != nil { log.Error(err, "Failed to get the submitter pod") return err @@ -343,12 +357,6 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS } submitter.pod = pod - // Extract submission result only when it is in deployment progress. - // It is not necessary to get the log stream from the submitter pod always. - var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying - if !jobDeployInProgress { - return nil - } log.Info("Extracting the result of job submission because it is completed") podLog, err = getFlinkJobSubmitLog(observer.k8sClientset, pod) if err != nil { @@ -376,19 +384,11 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS // and running. func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedClusterState, flinkJobID string, flinkJob *FlinkJob) { var log = observer.log - // Observe following var flinkJobStatus *flink.Job var flinkJobList *flink.JobsOverview var flinkJobsUnexpected []string - // Wait until the job manager is ready. - jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady - if !jmReady { - log.Info("Skip getting Flink job status; JobManager is not ready") - return - } - // Get Flink job status list. flinkAPIBaseURL := getFlinkAPIBaseURL(observed.cluster) flinkJobList, err := observer.flinkClient.GetJobsOverview(flinkAPIBaseURL) @@ -551,25 +551,25 @@ func (observer *ClusterStateObserver) observeJobManagerIngress( } func (observer *ClusterStateObserver) observeJobSubmitter( + jobName string, observedJob *batchv1.Job) error { var clusterNamespace = observer.request.Namespace - var clusterName = observer.request.Name return observer.k8sClient.Get( observer.context, types.NamespacedName{ Namespace: clusterNamespace, - Name: getJobName(clusterName), + Name: jobName, }, observedJob) } // observeJobSubmitterPod observes job submitter pod. func (observer *ClusterStateObserver) observeJobSubmitterPod( + jobName string, observedPod *corev1.Pod) error { var clusterNamespace = observer.request.Namespace - var clusterName = observer.request.Name - var podSelector = labels.SelectorFromSet(map[string]string{"job-name": getJobName(clusterName)}) + var podSelector = labels.SelectorFromSet(map[string]string{"job-name": jobName}) var podList = new(corev1.PodList) var err = observer.k8sClient.List(