Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Job mode property #52

Merged
merged 6 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func _SetJobDefault(jobSpec *JobSpec) {
AfterJobCancelled: CleanupActionDeleteCluster,
}
}
if jobSpec.Mode == nil {
jobSpec.Mode = new(JobMode)
*jobSpec.Mode = JobModeDetached
}
}

func _SetHadoopConfigDefault(hadoopConfig *HadoopConfig) {
Expand Down
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestSetDefault(t *testing.T) {
}
_SetDefault(&cluster)

var defaultJobMode JobMode = JobModeDetached
var defaultJmReplicas = int32(1)
var defaultJmRPCPort = int32(6123)
var defaultJmBlobPort = int32(6124)
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestSetDefault(t *testing.T) {
AfterJobCancelled: "DeleteCluster",
},
SecurityContext: nil,
Mode: &defaultJobMode,
},
FlinkProperties: nil,
HadoopConfig: &HadoopConfig{
Expand All @@ -127,6 +129,7 @@ func TestSetDefault(t *testing.T) {

// Tests non-default values are not overwritten unexpectedly.
func TestSetNonDefault(t *testing.T) {
var defaultJobMode = JobMode(JobModeDetached)
var jmReplicas = int32(2)
var jmRPCPort = int32(8123)
var jmBlobPort = int32(8124)
Expand Down Expand Up @@ -260,6 +263,7 @@ func TestSetNonDefault(t *testing.T) {
AfterJobFails: "DeleteCluster",
AfterJobCancelled: "KeepCluster",
},
Mode: &defaultJobMode,
},
FlinkProperties: nil,
HadoopConfig: &HadoopConfig{
Expand Down
11 changes: 11 additions & 0 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ const (
ComponentStateDeleted = "Deleted"
)

// JobMode defines the running mode for the job.
const (
JobModeBlocking = "Blocking"
JobModeDetached = "Detached"
)

type JobMode string

// JobState defines states for a Flink job deployment.
const (
JobStatePending = "Pending"
Expand Down Expand Up @@ -418,6 +426,9 @@ type JobSpec struct {
Resources corev1.ResourceRequirements `json:"resources,omitempty"`

SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`

// Job running mode
Mode *JobMode `json:"mode,omitempty"`
}

// FlinkClusterSpec defines the desired state of FlinkCluster
Expand Down
17 changes: 17 additions & 0 deletions api/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error {
"property `cancelRequested` cannot be set to true for a new job")
}

if jobSpec.Mode == nil {
return fmt.Errorf("job mode is unspecified")
}
if err := v.validateJobMode("mode", *jobSpec.Mode); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -528,6 +535,16 @@ func (v *Validator) validateCleanupAction(
return nil
}

func (v *Validator) validateJobMode(property string, value JobMode) error {
switch value {
case JobModeBlocking:
case JobModeDetached:
default:
return fmt.Errorf("invalid %v: %v", property, value)
}
return nil
}

func (v *Validator) validateRatio(ratio *int32, component, property string) error {
if ratio == nil || *ratio > 100 || *ratio < 0 {
return fmt.Errorf("invalid %v %v, it must be between 0 and 100", component, property)
Expand Down
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestValidateCreate(t *testing.T) {
var parallelism int32 = 2
var restartPolicy = JobRestartPolicyFromSavepointOnFailure
var memoryProcessRatio int32 = 25
var jobMode JobMode = JobModeDetached
var cluster = FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Expand Down Expand Up @@ -77,6 +78,7 @@ func TestValidateCreate(t *testing.T) {
AfterJobSucceeds: CleanupActionKeepCluster,
AfterJobFails: CleanupActionDeleteTaskManager,
},
Mode: &jobMode,
},
GCPConfig: &GCPConfig{
ServiceAccount: &GCPServiceAccount{
Expand Down Expand Up @@ -932,6 +934,7 @@ func getSimpleFlinkCluster() FlinkCluster {
var parallelism int32 = 2
var restartPolicy = JobRestartPolicyFromSavepointOnFailure
var savepointDir = "/savepoint_dir"
var jobMode JobMode = JobModeDetached
return FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Expand Down Expand Up @@ -974,6 +977,7 @@ func getSimpleFlinkCluster() FlinkCluster {
AfterJobSucceeds: CleanupActionKeepCluster,
AfterJobFails: CleanupActionDeleteTaskManager,
},
Mode: &jobMode,
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
annotations:
api-approved.kubernetes.io: unapproved
controller-gen.kubebuilder.io/version: v0.5.1-0.20210408091555-18885b17ff7b
creationTimestamp: null
name: flinkclusters.flinkoperator.k8s.io
spec:
group: flinkoperator.k8s.io
Expand Down Expand Up @@ -725,6 +724,8 @@ spec:
type: array
jarFile:
type: string
mode:
type: string
noLoggingToStdout:
type: boolean
parallelism:
Expand Down
17 changes: 12 additions & 5 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func getDesiredJobManagerStatefulSet(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var clusterSpec = flinkCluster.Spec
var imageSpec = clusterSpec.Image
var serviceAccount = clusterSpec.ServiceAccountName
Expand Down Expand Up @@ -255,8 +255,8 @@ func getDesiredJobManagerService(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var jobManagerSpec = flinkCluster.Spec.JobManager
var rpcPort = corev1.ServicePort{
Name: "rpc",
Expand Down Expand Up @@ -689,7 +689,14 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
*jobSpec.NoLoggingToStdout {
jobArgs = append(jobArgs, "--sysoutLogging")
}
jobArgs = append(jobArgs, "--detached")

if jobSpec.Mode != nil {
switch *jobSpec.Mode {
case v1beta1.JobModeBlocking:
case v1beta1.JobModeDetached:
jobArgs = append(jobArgs, "--detached")
}
}

var securityContext = jobSpec.SecurityContext

Expand Down
2 changes: 2 additions & 0 deletions controllers/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestGetDesiredClusterState(t *testing.T) {
var memoryOffHeapRatio int32 = 25
var memoryOffHeapMin = resource.MustParse("600M")
var memoryProcessRatio int32 = 80
var jobMode v1beta1.JobMode = v1beta1.JobModeDetached
var jobBackoffLimit int32 = 0
var jmReadinessProbe = corev1.Probe{
Handler: corev1.Handler{
Expand Down Expand Up @@ -152,6 +153,7 @@ func TestGetDesiredClusterState(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
},
Mode: &jobMode,
RestartPolicy: &restartPolicy,
Volumes: []corev1.Volume{
{
Expand Down