From 830b419aa48bfcaf160cabf6c3390f4720590bca Mon Sep 17 00:00:00 2001 From: Hao Xin Date: Mon, 7 Mar 2022 19:30:35 +0800 Subject: [PATCH] Chore: consist the FlinkCluster passed as ref always (#307) --- .../flinkcluster/flinkcluster_converter.go | 18 +++++++++--------- .../flinkcluster/flinkcluster_reconciler.go | 6 +++--- controllers/flinkcluster/flinkcluster_util.go | 2 +- .../flinkcluster/flinkcluster_util_test.go | 10 +++++----- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index a3aec762..aa61c235 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -209,7 +209,7 @@ func newJobManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta1 func newJobManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet { var jobManagerSpec = flinkCluster.Spec.JobManager var jobManagerStatefulSetName = getJobManagerStatefulSetName(flinkCluster.Name) - var podLabels = getComponentLabels(*flinkCluster, "jobmanager") + var podLabels = getComponentLabels(flinkCluster, "jobmanager") podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels) var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) @@ -270,7 +270,7 @@ func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service { Port: *jobManagerSpec.Ports.UI, TargetPort: intstr.FromString("ui")} var jobManagerServiceName = getJobManagerServiceName(clusterName) - var podLabels = getComponentLabels(*flinkCluster, "jobmanager") + var podLabels = getComponentLabels(flinkCluster, "jobmanager") podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels) var serviceLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) var jobManagerService = &corev1.Service{ @@ -332,7 +332,7 @@ func newJobManagerIngress( var ingressHost string var ingressTLS []networkingv1.IngressTLS var labels = mergeLabels( - getComponentLabels(*flinkCluster, "jobmanager"), + getComponentLabels(flinkCluster, "jobmanager"), getRevisionHashLabels(&flinkCluster.Status.Revision)) var pathType = networkingv1.PathTypePrefix if jobManagerIngressSpec.HostFormat != nil { @@ -479,7 +479,7 @@ func newTaskManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet { var taskManagerSpec = flinkCluster.Spec.TaskManager var taskManagerStatefulSetName = getTaskManagerStatefulSetName(flinkCluster.Name) - var podLabels = getComponentLabels(*flinkCluster, "taskmanager") + var podLabels = getComponentLabels(flinkCluster, "taskmanager") podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels) var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) @@ -530,7 +530,7 @@ func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap { var tmPorts = flinkCluster.Spec.TaskManager.Ports var configMapName = getConfigMapName(clusterName) var labels = mergeLabels( - getClusterLabels(*flinkCluster), + getClusterLabels(flinkCluster), getRevisionHashLabels(&flinkCluster.Status.Revision)) // Properties which should be provided from real deployed environment. var flinkProps = map[string]string{ @@ -705,7 +705,7 @@ func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job { recorded := flinkCluster.Status jobManagerSpec := flinkCluster.Spec.JobManager - labels := getClusterLabels(*flinkCluster) + labels := getClusterLabels(flinkCluster) labels = mergeLabels(labels, getRevisionHashLabels(&recorded.Revision)) var jobName string @@ -713,7 +713,7 @@ func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job { var podSpec *corev1.PodSpec if IsApplicationModeCluster(flinkCluster) { - labels = mergeLabels(labels, getComponentLabels(*flinkCluster, "jobmanager")) + labels = mergeLabels(labels, getComponentLabels(flinkCluster, "jobmanager")) labels = mergeLabels(labels, jobManagerSpec.PodLabels) labels = mergeLabels(labels, map[string]string{ "job-id": flink.GenJobId(flinkCluster.Namespace, flinkCluster.Name), @@ -1162,7 +1162,7 @@ func setGCPConfig(gcpConfig *v1beta1.GCPConfig, podSpec *corev1.PodSpec) bool { return true } -func getClusterLabels(cluster v1beta1.FlinkCluster) map[string]string { +func getClusterLabels(cluster *v1beta1.FlinkCluster) map[string]string { return map[string]string{ "cluster": cluster.Name, "app": "flink", @@ -1177,7 +1177,7 @@ func getServiceAccountName(serviceAccount *string) string { return "" } -func getComponentLabels(cluster v1beta1.FlinkCluster, component string) map[string]string { +func getComponentLabels(cluster *v1beta1.FlinkCluster, component string) map[string]string { return mergeLabels(getClusterLabels(cluster), map[string]string{ "component": component, }) diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 02c0f417..402ea174 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -666,7 +666,7 @@ func (reconciler *ClusterReconciler) trySuspendJob() (*v1beta1.SavepointStatus, var log = reconciler.log var recorded = reconciler.observed.cluster.Status - if !canTakeSavepoint(*reconciler.observed.cluster) { + if !canTakeSavepoint(reconciler.observed.cluster) { return nil, nil } @@ -753,7 +753,7 @@ func (reconciler *ClusterReconciler) cancelJobs( // Takes a savepoint if possible then stops the job. func (reconciler *ClusterReconciler) cancelFlinkJob(jobID string, takeSavepoint bool) error { var log = reconciler.log - if takeSavepoint && canTakeSavepoint(*reconciler.observed.cluster) { + if takeSavepoint && canTakeSavepoint(reconciler.observed.cluster) { log.Info("Taking savepoint before stopping job", "jobID", jobID) var err = reconciler.takeSavepoint(jobID) if err != nil { @@ -802,7 +802,7 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReas var savepoint = observed.cluster.Status.Savepoint var newRequestedControl = getNewControlRequest(cluster) - if !canTakeSavepoint(*reconciler.observed.cluster) { + if !canTakeSavepoint(reconciler.observed.cluster) { return "" } diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index be5b9036..e01d6d86 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -160,7 +160,7 @@ func isBlank(s *string) bool { } // Checks whether it is possible to take savepoint. -func canTakeSavepoint(cluster v1beta1.FlinkCluster) bool { +func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool { var jobSpec = cluster.Spec.Job var savepointStatus = cluster.Status.Savepoint var job = cluster.Status.Components.Job diff --git a/controllers/flinkcluster/flinkcluster_util_test.go b/controllers/flinkcluster/flinkcluster_util_test.go index 9a888b8a..87997aa2 100644 --- a/controllers/flinkcluster/flinkcluster_util_test.go +++ b/controllers/flinkcluster/flinkcluster_util_test.go @@ -145,7 +145,7 @@ func TestCanTakeSavepoint(t *testing.T) { var cluster = v1beta1.FlinkCluster{ Spec: v1beta1.FlinkClusterSpec{}, } - var take = canTakeSavepoint(cluster) + var take = canTakeSavepoint(&cluster) assert.Equal(t, take, false) // no savepointDir and job status @@ -154,7 +154,7 @@ func TestCanTakeSavepoint(t *testing.T) { Job: &v1beta1.JobSpec{}, }, } - take = canTakeSavepoint(cluster) + take = canTakeSavepoint(&cluster) assert.Equal(t, take, false) // no job status, job is to be started @@ -164,7 +164,7 @@ func TestCanTakeSavepoint(t *testing.T) { Job: &v1beta1.JobSpec{SavepointsDir: &savepointDir}, }, } - take = canTakeSavepoint(cluster) + take = canTakeSavepoint(&cluster) assert.Equal(t, take, true) // running job and no progressing savepoint @@ -177,7 +177,7 @@ func TestCanTakeSavepoint(t *testing.T) { Job: &v1beta1.JobStatus{State: "Running"}, }}, } - take = canTakeSavepoint(cluster) + take = canTakeSavepoint(&cluster) assert.Equal(t, take, true) // progressing savepoint @@ -193,7 +193,7 @@ func TestCanTakeSavepoint(t *testing.T) { Savepoint: &v1beta1.SavepointStatus{State: v1beta1.SavepointStateInProgress}, }, } - take = canTakeSavepoint(cluster) + take = canTakeSavepoint(&cluster) assert.Equal(t, take, false) }