From b6ebe5b4dfc532e85c7745307f4aa4cea36e1065 Mon Sep 17 00:00:00 2001 From: regadas Date: Mon, 18 Apr 2022 12:28:50 +0100 Subject: [PATCH 1/2] Avoid spawning flink cluster on user control annotation --- .../flinkcluster/flinkcluster_converter.go | 4 +--- .../flinkcluster/flinkcluster_reconciler.go | 17 ++++++++++++----- .../flinkcluster/flinkcluster_updater.go | 3 +++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index 28323cc5..6053cb1c 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -933,10 +933,8 @@ func getJobManagerIngressHost(ingressHostFormat string, clusterName string) stri // Checks whether the component should be deleted according to the cleanup // policy. Always return false for session cluster. -func shouldCleanup( - cluster *v1beta1.FlinkCluster, component string) bool { +func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool { var jobStatus = cluster.Status.Components.Job - // Session cluster. if jobStatus == nil { return false diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 470ed0a9..958c797e 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -579,6 +579,8 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { var newControlStatus *v1beta1.FlinkClusterControlStatus defer reconciler.updateStatus(&newSavepointStatus, &newControlStatus) + observedSubmitter := observed.flinkJobSubmitter.job + // Create new Flink job submitter when starting new job, updating job or restarting job in failure. if desiredJob != nil && !job.IsActive() { log.Info("Deploying Flink job") @@ -604,7 +606,6 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { log.Info("Failed to update the job status for job submission") return requeueResult, err } - var observedSubmitter = observed.flinkJobSubmitter.job if observedSubmitter != nil { log.Info("Found old job submitter") err = reconciler.deleteJob(observedSubmitter) @@ -656,20 +657,26 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { // Job cancel requested. Stop Flink job. if desiredJob == nil && job.IsActive() { + userControl := getNewControlRequest(observed.cluster) + if userControl == v1beta1.ControlNameJobCancel { + newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) + } + log.Info("Stopping job", "jobID", jobID) if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { return requeueResult, err } - var userControl = getNewControlRequest(observed.cluster) - if userControl == v1beta1.ControlNameJobCancel { - newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) - } return requeueResult, err } if job.IsStopped() { log.Info("Job has finished, no action") + if observedSubmitter != nil { + if err := reconciler.deleteJob(observedSubmitter); err != nil { + return requeueResult, err + } + } } return ctrl.Result{}, nil diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 6124696e..8f8f5de4 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -590,6 +590,8 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { } else { newJobState = oldJob.State } + case shouldStopJob(observedCluster): + newJobState = v1beta1.JobStateCancelled // When Flink job not found in JobManager or JobManager is unavailable case isFlinkAPIReady(observed.flinkJob.list): if oldJob.State == v1beta1.JobStateRunning { @@ -615,6 +617,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { newJobState = v1beta1.JobStateDeployFailed break } + newJobState = oldJob.State } // Update State From 6ba0b2e5ec5bd60bfd05a91830d7437ec61209c6 Mon Sep 17 00:00:00 2001 From: regadas Date: Tue, 19 Apr 2022 13:02:53 +0100 Subject: [PATCH 2/2] Attach job cleanup to cleanup policy --- .../flinkcluster/flinkcluster_converter.go | 3 +- .../flinkcluster/flinkcluster_reconciler.go | 33 +++++++++++-------- .../flinkcluster/flinkcluster_updater.go | 4 +-- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index 6053cb1c..cc8695db 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -112,7 +112,8 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste jobStatus := cluster.Status.Components.Job keepJobState := (shouldStopJob(cluster) || jobStatus.IsStopped()) && - (!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) + (!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) && + shouldCleanup(cluster, "Job") if !keepJobState { state.Job = newJob(cluster) diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 958c797e..74c46e74 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -581,6 +581,10 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { observedSubmitter := observed.flinkJobSubmitter.job + if desiredJob != nil && job.IsTerminated(jobSpec) { + return ctrl.Result{}, nil + } + // Create new Flink job submitter when starting new job, updating job or restarting job in failure. if desiredJob != nil && !job.IsActive() { log.Info("Deploying Flink job") @@ -656,29 +660,32 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { } // Job cancel requested. Stop Flink job. - if desiredJob == nil && job.IsActive() { - userControl := getNewControlRequest(observed.cluster) - if userControl == v1beta1.ControlNameJobCancel { - newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) - } + if desiredJob == nil { + if job.IsActive() { + userControl := getNewControlRequest(observed.cluster) + if userControl == v1beta1.ControlNameJobCancel { + newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) + } + + log.Info("Stopping job", "jobID", jobID) + if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { + return requeueResult, err + } - log.Info("Stopping job", "jobID", jobID) - if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { return requeueResult, err } - return requeueResult, err - } - - if job.IsStopped() { - log.Info("Job has finished, no action") - if observedSubmitter != nil { + if job.IsStopped() && observedSubmitter != nil { if err := reconciler.deleteJob(observedSubmitter); err != nil { return requeueResult, err } } } + if job.IsStopped() { + log.Info("Job has finished, no action") + } + return ctrl.Result{}, nil } diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 8f8f5de4..924cb6ff 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -569,10 +569,10 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { newJobState = v1beta1.JobStateUpdating case oldJob.ShouldRestart(jobSpec): newJobState = v1beta1.JobStateRestarting - case oldJob.IsPending() && oldJob.DeployTime != "": - newJobState = v1beta1.JobStateDeploying case oldJob.IsStopped(): newJobState = oldJob.State + case oldJob.IsPending() && oldJob.DeployTime != "": + newJobState = v1beta1.JobStateDeploying // Derive the job state from the observed Flink job, if it exists. case observedFlinkJob != nil: newJobState = getFlinkJobDeploymentState(observedFlinkJob.State)