Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small rework around job observer #284

Merged
merged 1 commit into from
Feb 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,38 +268,52 @@ func (observer *ClusterStateObserver) observeJob(
return nil
}
var log = observer.log
var recorded = observed.cluster.Status
var err error

// Extract the log stream from pod only when the job state is Deploying.
var recordedJob = observed.cluster.Status.Components.Job

// Extract submission result only when it is in deployment progress.
// It is not necessary to get the log stream from the submitter pod always.
var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
if !jobDeployInProgress {
return nil
}

var jobName = getJobName(observed.cluster.Name)
// Observe the Flink job submitter.
var submitter FlinkJobSubmitter

// Extract the log stream from pod only when the job state is Deploying.
var recordedJob = recorded.Components.Job
err = observer.observeSubmitter(recordedJob, &submitter)
if err != nil {
if err := observer.observeSubmitter(jobName, &submitter); err != nil {
log.Error(err, "Failed to get the status of the job submitter")
}
observed.flinkJobSubmitter = submitter

// Observe the Flink job status.
var flinkJobID string
if jobId, ok := submitter.pod.Labels["job-id"]; ok {
flinkJobID = jobId
} else
// Get the ID from the job submitter.
if submitter.log != nil && submitter.log.jobID != "" {
flinkJobID = submitter.log.jobID
} else
// Or get the job ID from the recorded job status which is written in previous iteration.
if recorded.Components.Job != nil {
flinkJobID = recorded.Components.Job.ID
if recordedJob != nil {
flinkJobID = recordedJob.ID
}

// Wait until the job manager is ready.
var observedFlinkJob FlinkJob
observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJob)
// Wait until the job manager is ready.
jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady
if jmReady {
observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJob)
}
observed.flinkJob = observedFlinkJob

return nil
}

func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobStatus, submitter *FlinkJobSubmitter) error {
func (observer *ClusterStateObserver) observeSubmitter(jobName string, submitter *FlinkJobSubmitter) error {
var log = observer.log
var err error

Expand All @@ -310,7 +324,7 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS

// Job resource.
job = new(batchv1.Job)
err = observer.observeJobSubmitter(job)
err = observer.observeJobSubmitter(jobName, job)
if err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get the job submitter")
Expand All @@ -331,7 +345,7 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS
}
// Get job submitter pod resource.
pod = new(corev1.Pod)
err = observer.observeJobSubmitterPod(pod)
err = observer.observeJobSubmitterPod(jobName, pod)
if err != nil {
log.Error(err, "Failed to get the submitter pod")
return err
Expand All @@ -343,12 +357,6 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS
}
submitter.pod = pod

// Extract submission result only when it is in deployment progress.
// It is not necessary to get the log stream from the submitter pod always.
var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
if !jobDeployInProgress {
return nil
}
log.Info("Extracting the result of job submission because it is completed")
podLog, err = getFlinkJobSubmitLog(observer.k8sClientset, pod)
if err != nil {
Expand Down Expand Up @@ -376,19 +384,11 @@ func (observer *ClusterStateObserver) observeSubmitter(recordedJob *v1beta1.JobS
// and running.
func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedClusterState, flinkJobID string, flinkJob *FlinkJob) {
var log = observer.log

// Observe following
var flinkJobStatus *flink.Job
var flinkJobList *flink.JobsOverview
var flinkJobsUnexpected []string

// Wait until the job manager is ready.
jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady
if !jmReady {
log.Info("Skip getting Flink job status; JobManager is not ready")
return
}

// Get Flink job status list.
flinkAPIBaseURL := getFlinkAPIBaseURL(observed.cluster)
flinkJobList, err := observer.flinkClient.GetJobsOverview(flinkAPIBaseURL)
Expand Down Expand Up @@ -551,25 +551,25 @@ func (observer *ClusterStateObserver) observeJobManagerIngress(
}

func (observer *ClusterStateObserver) observeJobSubmitter(
jobName string,
observedJob *batchv1.Job) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name

return observer.k8sClient.Get(
observer.context,
types.NamespacedName{
Namespace: clusterNamespace,
Name: getJobName(clusterName),
Name: jobName,
},
observedJob)
}

// observeJobSubmitterPod observes job submitter pod.
func (observer *ClusterStateObserver) observeJobSubmitterPod(
jobName string,
observedPod *corev1.Pod) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name
var podSelector = labels.SelectorFromSet(map[string]string{"job-name": getJobName(clusterName)})
var podSelector = labels.SelectorFromSet(map[string]string{"job-name": jobName})
var podList = new(corev1.PodList)

var err = observer.k8sClient.List(
Expand Down