Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: consist the FlinkCluster passed as ref always #307

Merged
merged 1 commit into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func newJobManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta1
func newJobManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
var jobManagerSpec = flinkCluster.Spec.JobManager
var jobManagerStatefulSetName = getJobManagerStatefulSetName(flinkCluster.Name)
var podLabels = getComponentLabels(*flinkCluster, "jobmanager")
var podLabels = getComponentLabels(flinkCluster, "jobmanager")
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

Expand Down Expand Up @@ -266,7 +266,7 @@ func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
Port: *jobManagerSpec.Ports.UI,
TargetPort: intstr.FromString("ui")}
var jobManagerServiceName = getJobManagerServiceName(clusterName)
var podLabels = getComponentLabels(*flinkCluster, "jobmanager")
var podLabels = getComponentLabels(flinkCluster, "jobmanager")
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
var serviceLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
var jobManagerService = &corev1.Service{
Expand Down Expand Up @@ -328,7 +328,7 @@ func newJobManagerIngress(
var ingressHost string
var ingressTLS []networkingv1.IngressTLS
var labels = mergeLabels(
getComponentLabels(*flinkCluster, "jobmanager"),
getComponentLabels(flinkCluster, "jobmanager"),
getRevisionHashLabels(&flinkCluster.Status.Revision))
var pathType = networkingv1.PathTypePrefix
if jobManagerIngressSpec.HostFormat != nil {
Expand Down Expand Up @@ -475,7 +475,7 @@ func newTaskManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta
func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
var taskManagerSpec = flinkCluster.Spec.TaskManager
var taskManagerStatefulSetName = getTaskManagerStatefulSetName(flinkCluster.Name)
var podLabels = getComponentLabels(*flinkCluster, "taskmanager")
var podLabels = getComponentLabels(flinkCluster, "taskmanager")
podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

Expand Down Expand Up @@ -526,7 +526,7 @@ func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
var tmPorts = flinkCluster.Spec.TaskManager.Ports
var configMapName = getConfigMapName(clusterName)
var labels = mergeLabels(
getClusterLabels(*flinkCluster),
getClusterLabels(flinkCluster),
getRevisionHashLabels(&flinkCluster.Status.Revision))
// Properties which should be provided from real deployed environment.
var flinkProps = map[string]string{
Expand Down Expand Up @@ -701,15 +701,15 @@ func newJob(flinkCluster *v1beta1.FlinkCluster) *batchv1.Job {

recorded := flinkCluster.Status
jobManagerSpec := flinkCluster.Spec.JobManager
labels := getClusterLabels(*flinkCluster)
labels := getClusterLabels(flinkCluster)
labels = mergeLabels(labels, getRevisionHashLabels(&recorded.Revision))

var jobName string
var annotations map[string]string
var podSpec *corev1.PodSpec

if IsApplicationModeCluster(flinkCluster) {
labels = mergeLabels(labels, getComponentLabels(*flinkCluster, "jobmanager"))
labels = mergeLabels(labels, getComponentLabels(flinkCluster, "jobmanager"))
labels = mergeLabels(labels, jobManagerSpec.PodLabels)
labels = mergeLabels(labels, map[string]string{
"job-id": flink.GenJobId(flinkCluster.Namespace, flinkCluster.Name),
Expand Down Expand Up @@ -1158,7 +1158,7 @@ func setGCPConfig(gcpConfig *v1beta1.GCPConfig, podSpec *corev1.PodSpec) bool {
return true
}

func getClusterLabels(cluster v1beta1.FlinkCluster) map[string]string {
func getClusterLabels(cluster *v1beta1.FlinkCluster) map[string]string {
return map[string]string{
"cluster": cluster.Name,
"app": "flink",
Expand All @@ -1173,7 +1173,7 @@ func getServiceAccountName(serviceAccount *string) string {
return ""
}

func getComponentLabels(cluster v1beta1.FlinkCluster, component string) map[string]string {
func getComponentLabels(cluster *v1beta1.FlinkCluster, component string) map[string]string {
return mergeLabels(getClusterLabels(cluster), map[string]string{
"component": component,
})
Expand Down
6 changes: 3 additions & 3 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (reconciler *ClusterReconciler) trySuspendJob() (*v1beta1.SavepointStatus,
var log = reconciler.log
var recorded = reconciler.observed.cluster.Status

if !canTakeSavepoint(*reconciler.observed.cluster) {
if !canTakeSavepoint(reconciler.observed.cluster) {
return nil, nil
}

Expand Down Expand Up @@ -753,7 +753,7 @@ func (reconciler *ClusterReconciler) cancelJobs(
// Takes a savepoint if possible then stops the job.
func (reconciler *ClusterReconciler) cancelFlinkJob(jobID string, takeSavepoint bool) error {
var log = reconciler.log
if takeSavepoint && canTakeSavepoint(*reconciler.observed.cluster) {
if takeSavepoint && canTakeSavepoint(reconciler.observed.cluster) {
log.Info("Taking savepoint before stopping job", "jobID", jobID)
var err = reconciler.takeSavepoint(jobID)
if err != nil {
Expand Down Expand Up @@ -802,7 +802,7 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReas
var savepoint = observed.cluster.Status.Savepoint
var newRequestedControl = getNewControlRequest(cluster)

if !canTakeSavepoint(*reconciler.observed.cluster) {
if !canTakeSavepoint(reconciler.observed.cluster) {
return ""
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func isBlank(s *string) bool {
}

// Checks whether it is possible to take savepoint.
func canTakeSavepoint(cluster v1beta1.FlinkCluster) bool {
func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool {
var jobSpec = cluster.Spec.Job
var savepointStatus = cluster.Status.Savepoint
var job = cluster.Status.Components.Job
Expand Down
10 changes: 5 additions & 5 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestCanTakeSavepoint(t *testing.T) {
var cluster = v1beta1.FlinkCluster{
Spec: v1beta1.FlinkClusterSpec{},
}
var take = canTakeSavepoint(cluster)
var take = canTakeSavepoint(&cluster)
assert.Equal(t, take, false)

// no savepointDir and job status
Expand All @@ -154,7 +154,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Job: &v1beta1.JobSpec{},
},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, false)

// no job status, job is to be started
Expand All @@ -164,7 +164,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Job: &v1beta1.JobSpec{SavepointsDir: &savepointDir},
},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, true)

// running job and no progressing savepoint
Expand All @@ -177,7 +177,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Job: &v1beta1.JobStatus{State: "Running"},
}},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, true)

// progressing savepoint
Expand All @@ -193,7 +193,7 @@ func TestCanTakeSavepoint(t *testing.T) {
Savepoint: &v1beta1.SavepointStatus{State: v1beta1.SavepointStateInProgress},
},
}
take = canTakeSavepoint(cluster)
take = canTakeSavepoint(&cluster)
assert.Equal(t, take, false)
}

Expand Down