Skip to content

Commit

Permalink
Chore: consist the FlinkCluster passed as ref always (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
haoxins authored Mar 7, 2022
1 parent 7c9420c commit 830b419
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
18 changes: 9 additions & 9 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func newJobManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta1
func newJobManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
var jobManagerSpec = flinkCluster.Spec.JobManager
var jobManagerStatefulSetName = getJobManagerStatefulSetName(flinkCluster.Name)
var podLabels = getComponentLabels(*flinkCluster, "jobmanager")
var podLabels = getComponentLabels(flinkCluster, "jobmanager")
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

Expand Down Expand Up @@ -270,7 +270,7 @@ func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
Port: *jobManagerSpec.Ports.UI,
TargetPort: intstr.FromString("ui")}
var jobManagerServiceName = getJobManagerServiceName(clusterName)
var podLabels = getComponentLabels(*flinkCluster, "jobmanager")
var podLabels = getComponentLabels(flinkCluster, "jobmanager")
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
var serviceLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
var jobManagerService = &corev1.Service{
Expand Down Expand Up @@ -332,7 +332,7 @@ func newJobManagerIngress(
var ingressHost string
var ingressTLS []networkingv1.IngressTLS
var labels = mergeLabels(
getComponentLabels(*flinkCluster, "jobmanager"),
getComponentLabels(flinkCluster, "jobmanager"),
getRevisionHashLabels(&flinkCluster.Status.Revision))
var pathType = networkingv1.PathTypePrefix
if jobManagerIngressSpec.HostFormat != nil {
Expand Down Expand Up @@ -479,7 +479,7 @@ func newTaskManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta
func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
var taskManagerSpec = flinkCluster.Spec.TaskManager
var taskManagerStatefulSetName = getTaskManagerStatefulSetName(flinkCluster.Name)
var podLabels = getComponentLabels(*flinkCluster, "taskmanager")
var podLabels = getComponentLabels(flinkCluster, "taskmanager")
podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

Expand Down Expand Up @@ -530,7 +530,7 @@ func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
var tmPorts = flinkCluster.Spec.TaskManager.Ports
var configMapName = getConfigMapName(clusterName)
var labels = mergeLabels(
getClusterLabels(*flinkCluster),
getClusterLabels(flinkCluster),
getRevisionHashLabels(&flinkCluster.Status.Revision))
// Properties which should be provided from real deployed environment.
var flinkProps = map[string]string{
Expand Down Expand Up @@ -705,15 +705,15 @@ func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job {

recorded := flinkCluster.Status
jobManagerSpec := flinkCluster.Spec.JobManager
labels := getClusterLabels(*flinkCluster)
labels := getClusterLabels(flinkCluster)
labels = mergeLabels(labels, getRevisionHashLabels(&recorded.Revision))

var jobName string
var annotations map[string]string
var podSpec *corev1.PodSpec

if IsApplicationModeCluster(flinkCluster) {
labels = mergeLabels(labels, getComponentLabels(*flinkCluster, "jobmanager"))
labels = mergeLabels(labels, getComponentLabels(flinkCluster, "jobmanager"))
labels = mergeLabels(labels, jobManagerSpec.PodLabels)
labels = mergeLabels(labels, map[string]string{
"job-id": flink.GenJobId(flinkCluster.Namespace, flinkCluster.Name),
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func setGCPConfig(gcpConfig *v1beta1.GCPConfig, podSpec *corev1.PodSpec) bool {
return true
}

func getClusterLabels(cluster v1beta1.FlinkCluster) map[string]string {
func getClusterLabels(cluster *v1beta1.FlinkCluster) map[string]string {
return map[string]string{
"cluster": cluster.Name,
"app": "flink",
Expand All @@ -1177,7 +1177,7 @@ func getServiceAccountName(serviceAccount *string) string {
return ""
}

func getComponentLabels(cluster v1beta1.FlinkCluster, component string) map[string]string {
func getComponentLabels(cluster *v1beta1.FlinkCluster, component string) map[string]string {
return mergeLabels(getClusterLabels(cluster), map[string]string{
"component": component,
})
Expand Down
6 changes: 3 additions & 3 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (reconciler *ClusterReconciler) trySuspendJob() (*v1beta1.SavepointStatus,
var log = reconciler.log
var recorded = reconciler.observed.cluster.Status

if !canTakeSavepoint(*reconciler.observed.cluster) {
if !canTakeSavepoint(reconciler.observed.cluster) {
return nil, nil
}

Expand Down Expand Up @@ -753,7 +753,7 @@ func (reconciler *ClusterReconciler) cancelJobs(
// Takes a savepoint if possible then stops the job.
func (reconciler *ClusterReconciler) cancelFlinkJob(jobID string, takeSavepoint bool) error {
var log = reconciler.log
if takeSavepoint && canTakeSavepoint(*reconciler.observed.cluster) {
if takeSavepoint && canTakeSavepoint(reconciler.observed.cluster) {
log.Info("Taking savepoint before stopping job", "jobID", jobID)
var err = reconciler.takeSavepoint(jobID)
if err != nil {
Expand Down Expand Up @@ -802,7 +802,7 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReas
var savepoint = observed.cluster.Status.Savepoint
var newRequestedControl = getNewControlRequest(cluster)

if !canTakeSavepoint(*reconciler.observed.cluster) {
if !canTakeSavepoint(reconciler.observed.cluster) {
return ""
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func isBlank(s *string) bool {
}

// Checks whether it is possible to take savepoint.
func canTakeSavepoint(cluster v1beta1.FlinkCluster) bool {
func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool {
var jobSpec = cluster.Spec.Job
var savepointStatus = cluster.Status.Savepoint
var job = cluster.Status.Components.Job
Expand Down
10 changes: 5 additions & 5 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestCanTakeSavepoint(t *testing.T) {
var cluster = v1beta1.FlinkCluster{
Spec: v1beta1.FlinkClusterSpec{},
}
var take = canTakeSavepoint(cluster)
var take = canTakeSavepoint(&cluster)
assert.Equal(t, take, false)

// no savepointDir and job status
Expand All @@ -154,7 +154,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Job: &v1beta1.JobSpec{},
},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, false)

// no job status, job is to be started
Expand All @@ -164,7 +164,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Job: &v1beta1.JobSpec{SavepointsDir: &savepointDir},
},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, true)

// running job and no progressing savepoint
Expand All @@ -177,7 +177,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Job: &v1beta1.JobStatus{State: "Running"},
}},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, true)

// progressing savepoint
Expand All @@ -193,7 +193,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Savepoint: &v1beta1.SavepointStatus{State: v1beta1.SavepointStateInProgress},
},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, false)
}

Expand Down

0 comments on commit 830b419

Please sign in to comment.