Skip to content

Commit

Permalink
Small rework around job observer (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Feb 22, 2022
1 parent 0349afa commit 4a9595f
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,38 +268,52 @@ func (observer *ClusterStateObserver) observeJob(
return nil
}
var log = observer.log
var recorded = observed.cluster.Status
var err error

// Extract the log stream from pod only when the job state is Deploying.
var recordedJob = observed.cluster.Status.Components.Job

// Extract submission result only when it is in deployment progress.
// It is not necessary to get the log stream from the submitter pod always.
var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
if !jobDeployInProgress {
return nil
}

var jobName = getJobName(observed.cluster.Name)
// Observe the Flink job submitter.
var submitter FlinkJobSubmitter

// Extract the log stream from pod only when the job state is Deploying.
var recordedJob = recorded.Components.Job
err = observer.observeSubmitter(recordedJob, &submitter)
if err != nil {
if err := observer.observeSubmitter(jobName, &submitter); err != nil {
log.Error(err, "Failed to get the status of the job submitter")
}
observed.flinkJobSubmitter = submitter

// Observe the Flink job status.
var flinkJobID string
if jobId, ok := submitter.pod.Labels["job-id"]; ok {
flinkJobID = jobId
} else
// Get the ID from the job submitter.
if submitter.log != nil && submitter.log.jobID != "" {
flinkJobID = submitter.log.jobID
} else
// Or get the job ID from the recorded job status which is written in previous iteration.
if recorded.Components.Job != nil {
flinkJobID = recorded.Components.Job.ID
if recordedJob != nil {
flinkJobID = recordedJob.ID
}

// Wait until the job manager is ready.
var observedFlinkJob FlinkJob
observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJob)
// Wait until the job manager is ready.
jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady
if jmReady {
observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJob)
}
observed.flinkJob = observedFlinkJob

return nil
}

func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobStatus, submitter *FlinkJobSubmitter) error {
func (observer *ClusterStateObserver) observeSubmitter(jobName string, submitter *FlinkJobSubmitter) error {
var log = observer.log
var err error

Expand All @@ -310,7 +324,7 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS

// Job resource.
job = new(batchv1.Job)
err = observer.observeJobSubmitter(job)
err = observer.observeJobSubmitter(jobName, job)
if err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get the job submitter")
Expand All @@ -331,7 +345,7 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS
}
// Get job submitter pod resource.
pod = new(corev1.Pod)
err = observer.observeJobSubmitterPod(pod)
err = observer.observeJobSubmitterPod(jobName, pod)
if err != nil {
log.Error(err, "Failed to get the submitter pod")
return err
Expand All @@ -343,12 +357,6 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS
}
submitter.pod = pod

// Extract submission result only when it is in deployment progress.
// It is not necessary to get the log stream from the submitter pod always.
var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
if !jobDeployInProgress {
return nil
}
log.Info("Extracting the result of job submission because it is completed")
podLog, err = getFlinkJobSubmitLog(observer.k8sClientset, pod)
if err != nil {
Expand Down Expand Up @@ -376,19 +384,11 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS
// and running.
func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedClusterState, flinkJobID string, flinkJob *FlinkJob) {
var log = observer.log

// Observe following
var flinkJobStatus *flink.Job
var flinkJobList *flink.JobsOverview
var flinkJobsUnexpected []string

// Wait until the job manager is ready.
jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady
if !jmReady {
log.Info("Skip getting Flink job status; JobManager is not ready")
return
}

// Get Flink job status list.
flinkAPIBaseURL := getFlinkAPIBaseURL(observed.cluster)
flinkJobList, err := observer.flinkClient.GetJobsOverview(flinkAPIBaseURL)
Expand Down Expand Up @@ -551,25 +551,25 @@ func (observer *ClusterStateObserver) observeJobManagerIngress(
}

func (observer *ClusterStateObserver) observeJobSubmitter(
jobName string,
observedJob *batchv1.Job) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name

return observer.k8sClient.Get(
observer.context,
types.NamespacedName{
Namespace: clusterNamespace,
Name: getJobName(clusterName),
Name: jobName,
},
observedJob)
}

// observeJobSubmitterPod observes job submitter pod.
func (observer *ClusterStateObserver) observeJobSubmitterPod(
jobName string,
observedPod *corev1.Pod) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name
var podSelector = labels.SelectorFromSet(map[string]string{"job-name": getJobName(clusterName)})
var podSelector = labels.SelectorFromSet(map[string]string{"job-name": jobName})
var podList = new(corev1.PodList)

var err = observer.k8sClient.List(
Expand Down

0 comments on commit 4a9595f

Please sign in to comment.