From 3e4885c041ff295c256112b4f890764ec6cda054 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Mon, 21 Jun 2021 14:47:07 +0100 Subject: [PATCH 1/6] Wip: Remove detached --- controllers/flinkcluster_converter.go | 11 ++++++----- controllers/flinkcluster_converter_test.go | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 5f9daf7e..cef92229 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -91,8 +91,8 @@ func getDesiredJobManagerStatefulSet( return nil } - var clusterNamespace = flinkCluster.ObjectMeta.Namespace - var clusterName = flinkCluster.ObjectMeta.Name + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var serviceAccount = clusterSpec.ServiceAccountName @@ -255,8 +255,8 @@ func getDesiredJobManagerService( return nil } - var clusterNamespace = flinkCluster.ObjectMeta.Namespace - var clusterName = flinkCluster.ObjectMeta.Name + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name var jobManagerSpec = flinkCluster.Spec.JobManager var rpcPort = corev1.ServicePort{ Name: "rpc", @@ -689,7 +689,8 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { *jobSpec.NoLoggingToStdout { jobArgs = append(jobArgs, "--sysoutLogging") } - jobArgs = append(jobArgs, "--detached") + + // jobArgs = append(jobArgs, "--detached") var securityContext = jobSpec.SecurityContext diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index a6953602..6da110f4 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -850,7 +850,7 @@ func TestGetDesiredClusterState(t *testing.T) { "org.apache.flink.examples.java.wordcount.WordCount", "--parallelism", "2", - "--detached", + // "--detached", "/cache/my-job.jar", "--input", "./README.txt", From 96a80f3f4bc7167bfdeb9d4ed0429be27b598b73 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 22 Jun 2021 11:50:31 +0100 Subject: [PATCH 2/6] Revert "Wip: Remove detached" This reverts commit 3e4885c041ff295c256112b4f890764ec6cda054. --- controllers/flinkcluster_converter.go | 11 +++++------ controllers/flinkcluster_converter_test.go | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index cef92229..5f9daf7e 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -91,8 +91,8 @@ func getDesiredJobManagerStatefulSet( return nil } - var clusterNamespace = flinkCluster.Namespace - var clusterName = flinkCluster.Name + var clusterNamespace = flinkCluster.ObjectMeta.Namespace + var clusterName = flinkCluster.ObjectMeta.Name var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var serviceAccount = clusterSpec.ServiceAccountName @@ -255,8 +255,8 @@ func getDesiredJobManagerService( return nil } - var clusterNamespace = flinkCluster.Namespace - var clusterName = flinkCluster.Name + var clusterNamespace = flinkCluster.ObjectMeta.Namespace + var clusterName = flinkCluster.ObjectMeta.Name var jobManagerSpec = flinkCluster.Spec.JobManager var rpcPort = corev1.ServicePort{ Name: "rpc", @@ -689,8 +689,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { *jobSpec.NoLoggingToStdout { jobArgs = append(jobArgs, "--sysoutLogging") } - - // jobArgs = append(jobArgs, "--detached") + jobArgs = append(jobArgs, "--detached") var securityContext = jobSpec.SecurityContext diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index 6da110f4..a6953602 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -850,7 +850,7 @@ func TestGetDesiredClusterState(t *testing.T) { "org.apache.flink.examples.java.wordcount.WordCount", "--parallelism", "2", - // "--detached", + "--detached", "/cache/my-job.jar", "--input", "./README.txt", From 38c8d370a33a7680331f7d829b3948e585711999 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 22 Jun 2021 14:00:38 +0100 Subject: [PATCH 3/6] Add Job mode property --- api/v1beta1/flinkcluster_default.go | 4 ++++ api/v1beta1/flinkcluster_default_test.go | 4 ++++ api/v1beta1/flinkcluster_types.go | 11 +++++++++++ api/v1beta1/flinkcluster_validate.go | 16 ++++++++++++++++ api/v1beta1/flinkcluster_validate_test.go | 4 ++++ api/v1beta1/zz_generated.deepcopy.go | 5 +++++ .../flinkoperator.k8s.io_flinkclusters.yaml | 1 - controllers/flinkcluster_converter.go | 18 +++++++++++++----- controllers/flinkcluster_converter_test.go | 2 ++ controllers/flinkcluster_util_test.go | 4 ++-- 10 files changed, 61 insertions(+), 8 deletions(-) diff --git a/api/v1beta1/flinkcluster_default.go b/api/v1beta1/flinkcluster_default.go index 380120ef..2f8471e6 100644 --- a/api/v1beta1/flinkcluster_default.go +++ b/api/v1beta1/flinkcluster_default.go @@ -145,6 +145,10 @@ func _SetJobDefault(jobSpec *JobSpec) { AfterJobCancelled: CleanupActionDeleteCluster, } } + if jobSpec.Mode == nil { + jobSpec.Mode = new(JobMode) + *jobSpec.Mode = JobModeDetached + } } func _SetHadoopConfigDefault(hadoopConfig *HadoopConfig) { diff --git a/api/v1beta1/flinkcluster_default_test.go b/api/v1beta1/flinkcluster_default_test.go index cef7a71a..32e9e0aa 100644 --- a/api/v1beta1/flinkcluster_default_test.go +++ b/api/v1beta1/flinkcluster_default_test.go @@ -40,6 +40,7 @@ func TestSetDefault(t *testing.T) { } _SetDefault(&cluster) + var defaultJobMode = JobMode(JobModeDetached) var defaultJmReplicas = int32(1) var defaultJmRPCPort = int32(6123) var defaultJmBlobPort = int32(6124) @@ -107,6 +108,7 @@ func TestSetDefault(t *testing.T) { AfterJobCancelled: "DeleteCluster", }, SecurityContext: nil, + Mode: &defaultJobMode, }, FlinkProperties: nil, HadoopConfig: &HadoopConfig{ @@ -127,6 +129,7 @@ func TestSetDefault(t *testing.T) { // Tests non-default values are not overwritten unexpectedly. func TestSetNonDefault(t *testing.T) { + var defaultJobMode = JobMode(JobModeDetached) var jmReplicas = int32(2) var jmRPCPort = int32(8123) var jmBlobPort = int32(8124) @@ -260,6 +263,7 @@ func TestSetNonDefault(t *testing.T) { AfterJobFails: "DeleteCluster", AfterJobCancelled: "KeepCluster", }, + Mode: &defaultJobMode, }, FlinkProperties: nil, HadoopConfig: &HadoopConfig{ diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index 0283994e..61e81cb3 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -41,6 +41,14 @@ const ( ComponentStateDeleted = "Deleted" ) +// JobMode +const ( + JobModeBlocking = "Blocking" + JobModeDetached = "Detached" +) + +type JobMode string + // JobState defines states for a Flink job deployment. const ( JobStatePending = "Pending" @@ -418,6 +426,9 @@ type JobSpec struct { Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` + + // Job running mode + Mode *JobMode `json:"mode"` } // FlinkClusterSpec defines the desired state of FlinkCluster diff --git a/api/v1beta1/flinkcluster_validate.go b/api/v1beta1/flinkcluster_validate.go index 6c95904e..2ce4ba6e 100644 --- a/api/v1beta1/flinkcluster_validate.go +++ b/api/v1beta1/flinkcluster_validate.go @@ -477,6 +477,13 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error { "property `cancelRequested` cannot be set to true for a new job") } + if jobSpec.Mode == nil { + return fmt.Errorf("job mode is unspecified") + } + if err := v.validateJobMode("mode", *jobSpec.Mode); err != nil { + return err + } + return nil } @@ -528,6 +535,15 @@ func (v *Validator) validateCleanupAction( return nil } +func (v *Validator) validateJobMode(property string, value JobMode) error { + switch value { + case JobModeDetached: + default: + return fmt.Errorf("invalid %v: %v", property, value) + } + return nil +} + func (v *Validator) validateRatio(ratio *int32, component, property string) error { if ratio == nil || *ratio > 100 || *ratio < 0 { return fmt.Errorf("invalid %v %v, it must be between 0 and 100", component, property) diff --git a/api/v1beta1/flinkcluster_validate_test.go b/api/v1beta1/flinkcluster_validate_test.go index d4840317..52a20c76 100644 --- a/api/v1beta1/flinkcluster_validate_test.go +++ b/api/v1beta1/flinkcluster_validate_test.go @@ -38,6 +38,7 @@ func TestValidateCreate(t *testing.T) { var parallelism int32 = 2 var restartPolicy = JobRestartPolicyFromSavepointOnFailure var memoryProcessRatio int32 = 25 + var jobMode JobMode = JobModeDetached var cluster = FlinkCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "mycluster", @@ -77,6 +78,7 @@ func TestValidateCreate(t *testing.T) { AfterJobSucceeds: CleanupActionKeepCluster, AfterJobFails: CleanupActionDeleteTaskManager, }, + Mode: &jobMode, }, GCPConfig: &GCPConfig{ ServiceAccount: &GCPServiceAccount{ @@ -932,6 +934,7 @@ func getSimpleFlinkCluster() FlinkCluster { var parallelism int32 = 2 var restartPolicy = JobRestartPolicyFromSavepointOnFailure var savepointDir = "/savepoint_dir" + var jobMode JobMode = JobModeDetached return FlinkCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "mycluster", @@ -974,6 +977,7 @@ func getSimpleFlinkCluster() FlinkCluster { AfterJobSucceeds: CleanupActionKeepCluster, AfterJobFails: CleanupActionDeleteTaskManager, }, + Mode: &jobMode, }, }, } diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index f6b53eaa..1ab93206 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -676,6 +676,11 @@ func (in *JobSpec) DeepCopyInto(out *JobSpec) { *out = new(v1.PodSecurityContext) (*in).DeepCopyInto(*out) } + if in.Mode != nil { + in, out := &in.Mode, &out.Mode + *out = new(JobMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSpec. diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index ce52805c..464d9483 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -4,7 +4,6 @@ metadata: annotations: api-approved.kubernetes.io: unapproved controller-gen.kubebuilder.io/version: v0.5.1-0.20210408091555-18885b17ff7b - creationTimestamp: null name: flinkclusters.flinkoperator.k8s.io spec: group: flinkoperator.k8s.io diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 5f9daf7e..7a31100b 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -91,8 +91,8 @@ func getDesiredJobManagerStatefulSet( return nil } - var clusterNamespace = flinkCluster.ObjectMeta.Namespace - var clusterName = flinkCluster.ObjectMeta.Name + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var serviceAccount = clusterSpec.ServiceAccountName @@ -255,8 +255,8 @@ func getDesiredJobManagerService( return nil } - var clusterNamespace = flinkCluster.ObjectMeta.Namespace - var clusterName = flinkCluster.ObjectMeta.Name + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name var jobManagerSpec = flinkCluster.Spec.JobManager var rpcPort = corev1.ServicePort{ Name: "rpc", @@ -689,7 +689,15 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { *jobSpec.NoLoggingToStdout { jobArgs = append(jobArgs, "--sysoutLogging") } - jobArgs = append(jobArgs, "--detached") + + if jobSpec.Mode != nil { + switch *jobSpec.Mode { + case v1beta1.JobModeDetached: + fallthrough + default: + jobArgs = append(jobArgs, "--detached") + } + } var securityContext = jobSpec.SecurityContext diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index a6953602..e265e52c 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -55,6 +55,7 @@ func TestGetDesiredClusterState(t *testing.T) { var memoryOffHeapRatio int32 = 25 var memoryOffHeapMin = resource.MustParse("600M") var memoryProcessRatio int32 = 80 + var jobMode v1beta1.JobMode = v1beta1.JobModeDetached var jobBackoffLimit int32 = 0 var jmReadinessProbe = corev1.Probe{ Handler: corev1.Handler{ @@ -152,6 +153,7 @@ func TestGetDesiredClusterState(t *testing.T) { corev1.ResourceMemory: resource.MustParse("512Mi"), }, }, + Mode: &jobMode, RestartPolicy: &restartPolicy, Volumes: []corev1.Volume{ { diff --git a/controllers/flinkcluster_util_test.go b/controllers/flinkcluster_util_test.go index 46334233..130db754 100644 --- a/controllers/flinkcluster_util_test.go +++ b/controllers/flinkcluster_util_test.go @@ -139,10 +139,10 @@ func TestNewRevision(t *testing.T) { var revision, _ = newRevision(&flinkCluster, 1, &collisionCount) var expectedRevision = appsv1.ControllerRevision{ ObjectMeta: metav1.ObjectMeta{ - Name: "mycluster-7bc87c954f", + Name: "mycluster-6889b5645c", Namespace: "default", Labels: map[string]string{ - "flinkoperator.k8s.io/hash": "7bc87c954f", + "flinkoperator.k8s.io/hash": "6889b5645c", "flinkoperator.k8s.io/managed-by": "mycluster", }, Annotations: map[string]string{}, From 331663fd1a13ae8ac06730be4037cd82118cc38d Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 22 Jun 2021 14:07:35 +0100 Subject: [PATCH 4/6] fixup! Add Job mode property --- api/v1beta1/flinkcluster_default_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/v1beta1/flinkcluster_default_test.go b/api/v1beta1/flinkcluster_default_test.go index 32e9e0aa..33c5b476 100644 --- a/api/v1beta1/flinkcluster_default_test.go +++ b/api/v1beta1/flinkcluster_default_test.go @@ -40,7 +40,7 @@ func TestSetDefault(t *testing.T) { } _SetDefault(&cluster) - var defaultJobMode = JobMode(JobModeDetached) + var defaultJobMode JobMode = JobModeDetached var defaultJmReplicas = int32(1) var defaultJmRPCPort = int32(6123) var defaultJmBlobPort = int32(6124) From 5f4f20a5380d9368a76058b1788d477b9b813c19 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 22 Jun 2021 14:08:43 +0100 Subject: [PATCH 5/6] Add comment --- api/v1beta1/flinkcluster_types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index 61e81cb3..f2d3943b 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -41,7 +41,7 @@ const ( ComponentStateDeleted = "Deleted" ) -// JobMode +// JobMode defines the running mode for the job. const ( JobModeBlocking = "Blocking" JobModeDetached = "Detached" From 49b451709e6de111b8e80873d8f3dbd1c013bf0f Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Wed, 23 Jun 2021 10:31:58 +0100 Subject: [PATCH 6/6] fix --- api/v1beta1/flinkcluster_types.go | 2 +- api/v1beta1/flinkcluster_validate.go | 1 + config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml | 2 ++ controllers/flinkcluster_converter.go | 3 +-- controllers/flinkcluster_util_test.go | 4 ++-- 5 files changed, 7 insertions(+), 5 deletions(-) diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index f2d3943b..ec61f642 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -428,7 +428,7 @@ type JobSpec struct { SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` // Job running mode - Mode *JobMode `json:"mode"` + Mode *JobMode `json:"mode,omitempty"` } // FlinkClusterSpec defines the desired state of FlinkCluster diff --git a/api/v1beta1/flinkcluster_validate.go b/api/v1beta1/flinkcluster_validate.go index 2ce4ba6e..f2db19cf 100644 --- a/api/v1beta1/flinkcluster_validate.go +++ b/api/v1beta1/flinkcluster_validate.go @@ -537,6 +537,7 @@ func (v *Validator) validateCleanupAction( func (v *Validator) validateJobMode(property string, value JobMode) error { switch value { + case JobModeBlocking: case JobModeDetached: default: return fmt.Errorf("invalid %v: %v", property, value) diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index 464d9483..9823814c 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -724,6 +724,8 @@ spec: type: array jarFile: type: string + mode: + type: string noLoggingToStdout: type: boolean parallelism: diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 7a31100b..6d2b31f4 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -692,9 +692,8 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { if jobSpec.Mode != nil { switch *jobSpec.Mode { + case v1beta1.JobModeBlocking: case v1beta1.JobModeDetached: - fallthrough - default: jobArgs = append(jobArgs, "--detached") } } diff --git a/controllers/flinkcluster_util_test.go b/controllers/flinkcluster_util_test.go index 130db754..46334233 100644 --- a/controllers/flinkcluster_util_test.go +++ b/controllers/flinkcluster_util_test.go @@ -139,10 +139,10 @@ func TestNewRevision(t *testing.T) { var revision, _ = newRevision(&flinkCluster, 1, &collisionCount) var expectedRevision = appsv1.ControllerRevision{ ObjectMeta: metav1.ObjectMeta{ - Name: "mycluster-6889b5645c", + Name: "mycluster-7bc87c954f", Namespace: "default", Labels: map[string]string{ - "flinkoperator.k8s.io/hash": "6889b5645c", + "flinkoperator.k8s.io/hash": "7bc87c954f", "flinkoperator.k8s.io/managed-by": "mycluster", }, Annotations: map[string]string{},