Skip to content

Commit

Permalink
Attach usrlib folder if remote files (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Feb 17, 2022
1 parent d79dd7c commit 46e73e4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
83 changes: 57 additions & 26 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,24 +677,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
Name: jobManagerAddrEnvVar,
Value: jobManagerAddress,
})

if jobSpec.JarFile != nil {
jobArgs = append(jobArgs, getLocalPath(&envVars, jobJarUriEnvVar, *jobSpec.JarFile))
}

if jobSpec.PyFile != nil {
jobArgs = append(jobArgs, "--python", getLocalPath(&envVars, jobPyFileUriEnvVar, *jobSpec.PyFile))
}

if jobSpec.PyFiles != nil {
jobArgs = append(jobArgs, "--pyFiles", getLocalPath(&envVars, jobPyFilesUriEnvVar, *jobSpec.PyFiles))
}

if jobSpec.PyModule != nil {
jobArgs = append(jobArgs, "--pyModule", *jobSpec.PyModule)
}

jobArgs = append(jobArgs, jobSpec.Args...)
envVars = append(envVars, flinkCluster.Spec.EnvVars...)

var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
Expand Down Expand Up @@ -730,7 +713,38 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
envVars = append(envVars, *saEnv)
}

envVars = append(envVars, flinkCluster.Spec.EnvVars...)
if jobSpec.JarFile != nil {
jobArgs = append(jobArgs, getLocalPath(*jobSpec.JarFile))
envVars = addEnvVar(envVars, jobJarUriEnvVar, *jobSpec.JarFile)
if isRemoteFile(*jobSpec.JarFile) {
volumes = addUsrLibVolume(volumes)
volumeMounts = addUsrLibVolumeMount(volumeMounts)
}
}

if jobSpec.PyFile != nil {
jobArgs = append(jobArgs, "--python", getLocalPath(*jobSpec.PyFile))
envVars = addEnvVar(envVars, jobJarUriEnvVar, *jobSpec.PyFile)
if isRemoteFile(*jobSpec.PyFile) {
volumes = addUsrLibVolume(volumes)
volumeMounts = addUsrLibVolumeMount(volumeMounts)
}
}

if jobSpec.PyFiles != nil {
jobArgs = append(jobArgs, "--pyFiles", getLocalPath(*jobSpec.PyFiles))
envVars = addEnvVar(envVars, jobPyFilesUriEnvVar, *jobSpec.PyFiles)
if isRemoteFile(*jobSpec.PyFiles) {
volumes = addUsrLibVolume(volumes)
volumeMounts = addUsrLibVolumeMount(volumeMounts)
}
}

if jobSpec.PyModule != nil {
jobArgs = append(jobArgs, "--pyModule", *jobSpec.PyModule)
}

jobArgs = append(jobArgs, jobSpec.Args...)

var podSpec = corev1.PodSpec{
InitContainers: convertContainers(jobSpec.InitContainers, volumeMounts, envVars),
Expand Down Expand Up @@ -816,6 +830,17 @@ func convertFromSavepoint(jobSpec *v1beta1.JobSpec, jobStatus *v1beta1.JobStatus
return nil
}

func addUsrLibVolume(volumes []corev1.Volume) []corev1.Volume {
return appendVolumes(volumes, corev1.Volume{Name: "usrlib"})
}

func addUsrLibVolumeMount(volumeMounts []corev1.VolumeMount) []corev1.VolumeMount {
return appendVolumeMounts(volumeMounts, corev1.VolumeMount{
Name: "usrlib",
MountPath: usrLibDir,
})
}

func appendVolumes(volumes []corev1.Volume, newVolumes ...corev1.Volume) []corev1.Volume {
for _, mounts := range newVolumes {
var conflict = false
Expand Down Expand Up @@ -850,6 +875,13 @@ func appendVolumeMounts(volumeMounts []corev1.VolumeMount, newVolumeMounts ...co
return volumeMounts
}

func addEnvVar(envVars []corev1.EnvVar, name, value string) []corev1.EnvVar {
return appendEnvVars(envVars, corev1.EnvVar{
Name: name,
Value: value,
})
}

func appendEnvVars(envVars []corev1.EnvVar, newEnvVars ...corev1.EnvVar) []corev1.EnvVar {
for _, envVar := range newEnvVars {
var conflict = false
Expand Down Expand Up @@ -1206,17 +1238,16 @@ func mergeLabels(labels1 map[string]string, labels2 map[string]string) map[strin
return mergedLabels
}

func isRemoteFile(filePath string) bool {
return strings.Contains(filePath, "://")
}

// getLocalPath puts the URI in the env variable and rewrite the path
// to a local path if the file is remote and returns the local path.
// The entrypoint script of the container will download it before submitting it to Flink.
func getLocalPath(envVars *[]corev1.EnvVar, envName string, filePath string) string {
*envVars = append(*envVars, corev1.EnvVar{
Name: envName,
Value: filePath,
})

func getLocalPath(filePath string) string {
var localPath = filePath
if strings.Contains(filePath, "://") {
if isRemoteFile(filePath) {
var parts = strings.Split(filePath, "/")
localPath = path.Join(usrLibDir, parts[len(parts)-1])
}
Expand Down
8 changes: 4 additions & 4 deletions controllers/flinkcluster/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,10 +846,10 @@ func TestGetDesiredClusterState(t *testing.T) {
Env: []corev1.EnvVar{
{Name: "FLINK_USR_LIB_DIR", Value: "/opt/flink/job"},
{Name: "FLINK_JM_ADDR", Value: "flinkjobcluster-sample-jobmanager:8081"},
{Name: "FLINK_JOB_JAR_URI", Value: "/cache/my-job.jar"},
{Name: "FOO", Value: "abc"},
{Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"},
{Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: "/etc/gcp_service_account/gcp_service_account_key.json"},
{Name: "FOO", Value: "abc"},
{Name: "FLINK_JOB_JAR_URI", Value: "/cache/my-job.jar"},
},
},
},
Expand All @@ -874,13 +874,13 @@ func TestGetDesiredClusterState(t *testing.T) {
Env: []corev1.EnvVar{
{Name: "FLINK_USR_LIB_DIR", Value: "/opt/flink/job"},
{Name: "FLINK_JM_ADDR", Value: "flinkjobcluster-sample-jobmanager:8081"},
{Name: "FLINK_JOB_JAR_URI", Value: "/cache/my-job.jar"},
{Name: "FOO", Value: "abc"},
{Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"},
{
Name: "GOOGLE_APPLICATION_CREDENTIALS",
Value: "/etc/gcp_service_account/gcp_service_account_key.json",
},
{Name: "FOO", Value: "abc"},
{Name: "FLINK_JOB_JAR_URI", Value: "/cache/my-job.jar"},
},
EnvFrom: []corev1.EnvFromSource{
{
Expand Down

0 comments on commit 46e73e4

Please sign in to comment.