From 93f18e75f8cc3ffa3eb92992cb3c108eaa8a36d7 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Fri, 25 Feb 2022 10:11:16 +0000 Subject: [PATCH 1/2] Revert setting Jar/Py files env vars When needed these should be set by the user, reducing the level of "magic" things happening in the operator and keeps it inline with k8s philosophy. --- .../flinkcluster/flinkcluster_converter.go | 19 ------------------- .../flinkcluster_converter_test.go | 2 -- controllers/flinkcluster/flinkcluster_util.go | 4 ++++ 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index 133d57aa..df0824d7 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -198,14 +198,6 @@ func newJobManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta1 setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec) podSpec.Containers = append(podSpec.Containers, jobManagerSpec.Sidecars...) - jobSpec := flinkCluster.Spec.Job - if IsApplicationModeCluster(flinkCluster) && jobSpec.JarFile != nil { - envVars := []corev1.EnvVar{{Name: jobJarUriEnvVar, Value: *jobSpec.JarFile}} - mainContainer.Env = appendEnvVars(mainContainer.Env, envVars...) - podSpec.Containers = convertContainers(podSpec.Containers, []corev1.VolumeMount{}, envVars) - podSpec.InitContainers = convertContainers(podSpec.InitContainers, []corev1.VolumeMount{}, envVars) - } - return podSpec } @@ -476,14 +468,6 @@ func newTaskManagerPodSpec(mainContainer *corev1.Container, flinkCluster *v1beta setGCPConfig(flinkCluster.Spec.GCPConfig, podSpec) podSpec.Containers = append(podSpec.Containers, taskManagerSpec.Sidecars...) - jobSpec := flinkCluster.Spec.Job - if IsApplicationModeCluster(flinkCluster) && jobSpec.JarFile != nil { - envVars := []corev1.EnvVar{{Name: jobJarUriEnvVar, Value: *jobSpec.JarFile}} - mainContainer.Env = appendEnvVars(mainContainer.Env, envVars...) - podSpec.Containers = convertContainers(podSpec.Containers, []corev1.VolumeMount{}, envVars) - podSpec.InitContainers = convertContainers(podSpec.InitContainers, []corev1.VolumeMount{}, envVars) - } - return podSpec } @@ -663,17 +647,14 @@ func newJobSubmitterPodSpec(flinkCluster *v1beta1.FlinkCluster) *corev1.PodSpec if jobSpec.JarFile != nil { jobArgs = append(jobArgs, *jobSpec.JarFile) - envVars = addEnvVar(envVars, jobJarUriEnvVar, *jobSpec.JarFile) } if jobSpec.PyFile != nil { jobArgs = append(jobArgs, "--python", *jobSpec.PyFile) - envVars = addEnvVar(envVars, jobPyFileUriEnvVar, *jobSpec.PyFile) } if jobSpec.PyFiles != nil { jobArgs = append(jobArgs, "--pyFiles", *jobSpec.PyFiles) - envVars = addEnvVar(envVars, jobPyFilesUriEnvVar, *jobSpec.PyFiles) } if jobSpec.PyModule != nil { diff --git a/controllers/flinkcluster/flinkcluster_converter_test.go b/controllers/flinkcluster/flinkcluster_converter_test.go index cc83eda3..2a154d0c 100644 --- a/controllers/flinkcluster/flinkcluster_converter_test.go +++ b/controllers/flinkcluster/flinkcluster_converter_test.go @@ -847,7 +847,6 @@ func TestGetDesiredClusterState(t *testing.T) { Env: []corev1.EnvVar{ {Name: "FLINK_JM_ADDR", Value: "flinkjobcluster-sample-jobmanager:8081"}, {Name: "FOO", Value: "abc"}, - {Name: "FLINK_JOB_JAR_URI", Value: "/cache/my-job.jar"}, {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, {Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: "/etc/gcp_service_account/gcp_service_account_key.json"}, }, @@ -874,7 +873,6 @@ func TestGetDesiredClusterState(t *testing.T) { Env: []corev1.EnvVar{ {Name: "FLINK_JM_ADDR", Value: "flinkjobcluster-sample-jobmanager:8081"}, {Name: "FOO", Value: "abc"}, - {Name: "FLINK_JOB_JAR_URI", Value: "/cache/my-job.jar"}, {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, { Name: "GOOGLE_APPLICATION_CREDENTIALS", diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index bdc93c32..922dddfa 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -563,3 +563,7 @@ func IsApplicationModeCluster(cluster *v1beta1.FlinkCluster) bool { jobSpec := cluster.Spec.Job return jobSpec != nil && *jobSpec.Mode == v1beta1.JobModeApplication } + +func IsApplicationMode(jobSpec *v1beta1.JobSpec) bool { + return jobSpec != nil && *jobSpec.Mode == v1beta1.JobModeApplication +} From b8ace6dee3ff53666a1fff4ae602dc8d088b7256 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Fri, 25 Feb 2022 10:14:52 +0000 Subject: [PATCH 2/2] remove unused --- controllers/flinkcluster/flinkcluster_util.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index 922dddfa..bdc93c32 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -563,7 +563,3 @@ func IsApplicationModeCluster(cluster *v1beta1.FlinkCluster) bool { jobSpec := cluster.Spec.Job return jobSpec != nil && *jobSpec.Mode == v1beta1.JobModeApplication } - -func IsApplicationMode(jobSpec *v1beta1.JobSpec) bool { - return jobSpec != nil && *jobSpec.Mode == v1beta1.JobModeApplication -}