diff --git a/apis/flinkcluster/v1beta1/assets/test/flinkcluster_type_fetched.yaml b/apis/flinkcluster/v1beta1/assets/test/flinkcluster_type_fetched.yaml new file mode 100644 index 00000000..2417e1f8 --- /dev/null +++ b/apis/flinkcluster/v1beta1/assets/test/flinkcluster_type_fetched.yaml @@ -0,0 +1,53 @@ +metadata: + name: state-machine + namespace: default +spec: + flinkVersion: "1.15" + image: + name: arm64v8/flink:1.15 + pullPolicy: Always + jobManager: + replicas: 1 + accessScope: Cluster + ports: + rpc: 6123 + blob: 6124 + query: 6125 + ui: 8081 + resources: + limits: + cpu: "2" + memory: 2Gi + requests: + cpu: 200m + memory: 512Mi + memoryOffHeapMin: "0" + taskManager: + deploymentType: StatefulSet + replicas: 3 + ports: + data: 6121 + rpc: 6122 + query: 6125 + resources: + limits: + cpu: "2" + memory: 2Gi + requests: + cpu: 200m + memory: 512Mi + memoryOffHeapMin: "0" + job: + jarFile: ./examples/streaming/StateMachineExample.jar + allowNonRestoredState: false + parallelism: 1 + noLoggingToStdout: false + restartPolicy: Never + cleanupPolicy: + afterJobSucceeds: DeleteCluster + afterJobFails: KeepCluster + afterJobCancelled: DeleteCluster + mode: Detached + flinkProperties: + taskmanager.numberOfTaskSlots: "1" + recreateOnUpdate: true diff --git a/apis/flinkcluster/v1beta1/assets/test/flinkcluster_type_test.yaml b/apis/flinkcluster/v1beta1/assets/test/flinkcluster_type_test.yaml new file mode 100644 index 00000000..72c1af20 --- /dev/null +++ b/apis/flinkcluster/v1beta1/assets/test/flinkcluster_type_test.yaml @@ -0,0 +1,13 @@ +apiVersion: flinkoperator.k8s.io/v1beta1 +kind: FlinkCluster +metadata: + namespace: default + name: state-machine +spec: + flinkVersion: "1.15" + image: + name: arm64v8/flink:1.15 + job: + jarFile: ./examples/streaming/StateMachineExample.jar + flinkProperties: + taskmanager.numberOfTaskSlots: "1" diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default.go b/apis/flinkcluster/v1beta1/flinkcluster_default.go index ca98421d..0bc30778 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default.go @@ -48,18 +48,12 @@ var ( // Sets default values for unspecified FlinkCluster properties. func _SetDefault(cluster *FlinkCluster) { - if cluster.Spec.RecreateOnUpdate == nil { - cluster.Spec.RecreateOnUpdate = new(bool) - *cluster.Spec.RecreateOnUpdate = true - } - if cluster.Spec.BatchSchedulerName != nil { cluster.Spec.BatchScheduler = &BatchSchedulerSpec{ Name: *cluster.Spec.BatchSchedulerName, } } - _SetImageDefault(&cluster.Spec.Image) flinkVersion, _ := version.NewVersion(cluster.Spec.FlinkVersion) if cluster.Spec.JobManager == nil { cluster.Spec.JobManager = &JobManagerSpec{} @@ -69,15 +63,6 @@ func _SetDefault(cluster *FlinkCluster) { cluster.Spec.TaskManager = &TaskManagerSpec{} } _SetTaskManagerDefault(cluster.Spec.TaskManager, flinkVersion) - _SetJobDefault(cluster.Spec.Job) - _SetHadoopConfigDefault(cluster.Spec.HadoopConfig) - -} - -func _SetImageDefault(imageSpec *ImageSpec) { - if len(imageSpec.PullPolicy) == 0 { - imageSpec.PullPolicy = corev1.PullAlways - } } func _SetJobManagerDefault(jmSpec *JobManagerSpec, flinkVersion *version.Version) { @@ -85,36 +70,6 @@ func _SetJobManagerDefault(jmSpec *JobManagerSpec, flinkVersion *version.Version return } - if jmSpec.Replicas == nil { - jmSpec.Replicas = new(int32) - *jmSpec.Replicas = DefaultJobManagerReplicas - } - if len(jmSpec.AccessScope) == 0 { - jmSpec.AccessScope = AccessScopeCluster - } - if jmSpec.Ingress != nil { - if jmSpec.Ingress.UseTLS == nil { - jmSpec.Ingress.UseTLS = new(bool) - *jmSpec.Ingress.UseTLS = false - } - } - if jmSpec.Ports.RPC == nil { - jmSpec.Ports.RPC = new(int32) - *jmSpec.Ports.RPC = 6123 - } - if jmSpec.Ports.Blob == nil { - jmSpec.Ports.Blob = new(int32) - *jmSpec.Ports.Blob = 6124 - } - if jmSpec.Ports.Query == nil { - jmSpec.Ports.Query = new(int32) - *jmSpec.Ports.Query = 6125 - } - if jmSpec.Ports.UI == nil { - jmSpec.Ports.UI = new(int32) - *jmSpec.Ports.UI = 8081 - } - if flinkVersion == nil || flinkVersion.LessThan(v10) { if jmSpec.MemoryOffHeapMin.Format == "" { jmSpec.MemoryOffHeapMin = *resource.NewScaledQuantity(600, 6) // 600MB @@ -129,9 +84,6 @@ func _SetJobManagerDefault(jmSpec *JobManagerSpec, flinkVersion *version.Version *jmSpec.MemoryProcessRatio = 80 } } - if jmSpec.Resources.Size() == 0 { - jmSpec.Resources = DefaultResources - } var livenessProbe = corev1.Probe{ Handler: corev1.Handler{ @@ -170,22 +122,6 @@ func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Versi if tmSpec == nil { return } - if tmSpec.Replicas == nil { - tmSpec.Replicas = new(int32) - *tmSpec.Replicas = DefaultTaskManagerReplicas - } - if tmSpec.Ports.Data == nil { - tmSpec.Ports.Data = new(int32) - *tmSpec.Ports.Data = 6121 - } - if tmSpec.Ports.RPC == nil { - tmSpec.Ports.RPC = new(int32) - *tmSpec.Ports.RPC = 6122 - } - if tmSpec.Ports.Query == nil { - tmSpec.Ports.Query = new(int32) - *tmSpec.Ports.Query = 6125 - } if flinkVersion == nil || flinkVersion.LessThan(v10) { if tmSpec.MemoryOffHeapMin.Format == "" { tmSpec.MemoryOffHeapMin = *resource.NewScaledQuantity(600, 6) // 600MB @@ -200,9 +136,6 @@ func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Versi *tmSpec.MemoryProcessRatio = 80 } } - if tmSpec.Resources.Size() == 0 { - tmSpec.Resources = DefaultResources - } var livenessProbe = corev1.Probe{ Handler: corev1.Handler{ @@ -235,49 +168,4 @@ func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Versi mergo.Merge(&readinessProbe, tmSpec.ReadinessProbe, mergo.WithOverride) } tmSpec.ReadinessProbe = &readinessProbe - - if tmSpec.DeploymentType == "" { - tmSpec.DeploymentType = DeploymentTypeStatefulSet - } -} - -func _SetJobDefault(jobSpec *JobSpec) { - if jobSpec == nil { - return - } - if jobSpec.AllowNonRestoredState == nil { - jobSpec.AllowNonRestoredState = new(bool) - *jobSpec.AllowNonRestoredState = false - } - if jobSpec.NoLoggingToStdout == nil { - jobSpec.NoLoggingToStdout = new(bool) - *jobSpec.NoLoggingToStdout = false - } - if jobSpec.RestartPolicy == nil { - jobSpec.RestartPolicy = new(JobRestartPolicy) - *jobSpec.RestartPolicy = JobRestartPolicyNever - } - if jobSpec.CleanupPolicy == nil { - jobSpec.CleanupPolicy = &CleanupPolicy{ - AfterJobSucceeds: CleanupActionDeleteCluster, - AfterJobFails: CleanupActionKeepCluster, - AfterJobCancelled: CleanupActionDeleteCluster, - } - } - if jobSpec.Mode == nil { - jobSpec.Mode = new(JobMode) - *jobSpec.Mode = JobModeDetached - } - if jobSpec.Resources.Size() == 0 { - jobSpec.Resources = DefaultResources - } -} - -func _SetHadoopConfigDefault(hadoopConfig *HadoopConfig) { - if hadoopConfig == nil { - return - } - if len(hadoopConfig.MountPath) == 0 { - hadoopConfig.MountPath = "/etc/hadoop/conf" - } } diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default_test.go b/apis/flinkcluster/v1beta1/flinkcluster_default_test.go index 72ef43dd..f597088f 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default_test.go @@ -31,35 +31,26 @@ import ( // Tests default values are set as expected. func TestSetDefault(t *testing.T) { + var clusterJmRPCPort = int32(6123) + var clusterTmRPCPort = int32(6122) var cluster = FlinkCluster{ Spec: FlinkClusterSpec{ Job: &JobSpec{}, JobManager: &JobManagerSpec{ Ingress: &JobManagerIngressSpec{}, + Ports: JobManagerPorts{RPC: &clusterJmRPCPort}, + }, + TaskManager: &TaskManagerSpec{ + Ports: TaskManagerPorts{RPC: &clusterTmRPCPort}, }, - HadoopConfig: &HadoopConfig{}, }, } _SetDefault(&cluster) - var defaultJobMode JobMode = JobModeDetached - var defaultJmReplicas = int32(1) var defaultJmRPCPort = int32(6123) - var defaultJmBlobPort = int32(6124) - var defaultJmQueryPort = int32(6125) - var defaultJmUIPort = int32(8081) - var defaultJmIngressTLSUse = false - var defaultTmDataPort = int32(6121) var defaultTmRPCPort = int32(6122) - var defaultTmQueryPort = int32(6125) - var defaultJobAllowNonRestoredState = false - var defaultJobNoLoggingToStdout = false - var defaultJobRestartPolicy = JobRestartPolicyNever var defaultMemoryOffHeapRatio = int32(25) var defaultMemoryOffHeapMin = resource.MustParse("600M") - var defaultRecreateOnUpdate = true - resources := DefaultResources - tmReplicas := int32(DefaultTaskManagerReplicas) var defaultJmReadinessProbe = corev1.Probe{ Handler: corev1.Handler{ TCPSocket: &corev1.TCPSocketAction{ @@ -111,22 +102,13 @@ func TestSetDefault(t *testing.T) { Spec: FlinkClusterSpec{ Image: ImageSpec{ Name: "", - PullPolicy: "Always", PullSecrets: nil, }, JobManager: &JobManagerSpec{ - Replicas: &defaultJmReplicas, - AccessScope: "Cluster", - Ingress: &JobManagerIngressSpec{ - UseTLS: &defaultJmIngressTLSUse, - }, + Ingress: &JobManagerIngressSpec{}, Ports: JobManagerPorts{ - RPC: &defaultJmRPCPort, - Blob: &defaultJmBlobPort, - Query: &defaultJmQueryPort, - UI: &defaultJmUIPort, + RPC: &defaultJmRPCPort, }, - Resources: resources, MemoryOffHeapRatio: &defaultMemoryOffHeapRatio, MemoryOffHeapMin: defaultMemoryOffHeapMin, Volumes: nil, @@ -136,14 +118,9 @@ func TestSetDefault(t *testing.T) { ReadinessProbe: &defaultJmReadinessProbe, }, TaskManager: &TaskManagerSpec{ - Replicas: &tmReplicas, - DeploymentType: DeploymentTypeStatefulSet, Ports: TaskManagerPorts{ - Data: &defaultTmDataPort, - RPC: &defaultTmRPCPort, - Query: &defaultTmQueryPort, + RPC: &defaultTmRPCPort, }, - Resources: resources, MemoryOffHeapRatio: &defaultMemoryOffHeapRatio, MemoryOffHeapMin: defaultMemoryOffHeapMin, Volumes: nil, @@ -152,24 +129,10 @@ func TestSetDefault(t *testing.T) { ReadinessProbe: &defaultTmReadinessProbe, }, Job: &JobSpec{ - AllowNonRestoredState: &defaultJobAllowNonRestoredState, - NoLoggingToStdout: &defaultJobNoLoggingToStdout, - RestartPolicy: &defaultJobRestartPolicy, - CleanupPolicy: &CleanupPolicy{ - AfterJobSucceeds: "DeleteCluster", - AfterJobFails: "KeepCluster", - AfterJobCancelled: "DeleteCluster", - }, SecurityContext: nil, - Mode: &defaultJobMode, - Resources: resources, }, FlinkProperties: nil, - HadoopConfig: &HadoopConfig{ - MountPath: "/etc/hadoop/conf", - }, - EnvVars: nil, - RecreateOnUpdate: &defaultRecreateOnUpdate, + EnvVars: nil, }, Status: FlinkClusterStatus{}, } @@ -183,7 +146,6 @@ func TestSetDefault(t *testing.T) { // Tests non-default values are not overwritten unexpectedly. func TestSetNonDefault(t *testing.T) { - var defaultJobMode = JobMode(JobModeDetached) var jmReplicas = int32(2) var jmRPCPort = int32(8123) var jmBlobPort = int32(8124) @@ -292,8 +254,7 @@ func TestSetNonDefault(t *testing.T) { ReadinessProbe: &jmReadinessProbe, }, TaskManager: &TaskManagerSpec{ - Replicas: &tmReplicas, - DeploymentType: DeploymentTypeDeployment, + Replicas: &tmReplicas, Ports: TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, @@ -311,16 +272,8 @@ func TestSetNonDefault(t *testing.T) { NoLoggingToStdout: &jobNoLoggingToStdout, RestartPolicy: &jobRestartPolicy, SecurityContext: &securityContext, - CleanupPolicy: &CleanupPolicy{ - AfterJobSucceeds: "DeleteTaskManagers", - AfterJobFails: "DeleteCluster", - AfterJobCancelled: "KeepCluster", - }, - }, - FlinkProperties: nil, - HadoopConfig: &HadoopConfig{ - MountPath: "/opt/flink/hadoop/conf", }, + FlinkProperties: nil, EnvVars: nil, RecreateOnUpdate: &recreateOnUpdate, }, @@ -382,8 +335,7 @@ func TestSetNonDefault(t *testing.T) { ReadinessProbe: &jmExpectedReadinessProbe, }, TaskManager: &TaskManagerSpec{ - Replicas: &tmReplicas, - DeploymentType: DeploymentTypeDeployment, + Replicas: &tmReplicas, Ports: TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, @@ -402,18 +354,8 @@ func TestSetNonDefault(t *testing.T) { NoLoggingToStdout: &jobNoLoggingToStdout, RestartPolicy: &jobRestartPolicy, SecurityContext: &securityContext, - CleanupPolicy: &CleanupPolicy{ - AfterJobSucceeds: "DeleteTaskManagers", - AfterJobFails: "DeleteCluster", - AfterJobCancelled: "KeepCluster", - }, - Mode: &defaultJobMode, - Resources: DefaultResources, - }, - FlinkProperties: nil, - HadoopConfig: &HadoopConfig{ - MountPath: "/opt/flink/hadoop/conf", }, + FlinkProperties: nil, EnvVars: nil, RecreateOnUpdate: &recreateOnUpdate, }, diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types.go b/apis/flinkcluster/v1beta1/flinkcluster_types.go index 5d8a3742..2109eb76 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types.go @@ -125,6 +125,7 @@ type ImageSpec struct { // Image pull policy. One of `Always, Never, IfNotPresent`, default: `Always`. // if :latest tag is specified, or IfNotPresent otherwise. // [More info](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) + // +kubebuilder:default:=Always PullPolicy corev1.PullPolicy `json:"pullPolicy,omitempty"` // _(Optional)_ Secrets for image pull. @@ -153,15 +154,27 @@ type NamedPort struct { // JobManagerPorts defines ports of JobManager. type JobManagerPorts struct { // RPC port, default: `6123`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=6123 RPC *int32 `json:"rpc,omitempty"` // Blob port, default: `6124`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=6124 Blob *int32 `json:"blob,omitempty"` // Query port, default: `6125`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=6125 Query *int32 `json:"query,omitempty"` // UI port, default: `8081`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=8081 UI *int32 `json:"ui,omitempty"` } @@ -175,6 +188,7 @@ type JobManagerIngressSpec struct { Annotations map[string]string `json:"annotations,omitempty"` // TLS use, default: `false`. + // +kubebuilder:default:=false UseTLS *bool `json:"useTls,omitempty"` // _(Optional)_TLS secret name. @@ -184,6 +198,7 @@ type JobManagerIngressSpec struct { // JobManagerSpec defines properties of JobManager. type JobManagerSpec struct { // The number of JobManager replicas, default: `1` + // +kubebuilder:default:=1 Replicas *int32 `json:"replicas,omitempty"` // Access scope, default: `Cluster`. @@ -193,6 +208,7 @@ type JobManagerSpec struct { // `NodePort`: accessible through node port. // `Headless`: pod IPs assumed to be routable and advertised directly with `clusterIP: None``. // Currently `VPC, External` are only available for GKE. + // +kubebuilder:default:=Cluster AccessScope string `json:"accessScope,omitempty"` // _(Optional)_ Define JobManager Service annotations for configuration. @@ -205,6 +221,7 @@ type JobManagerSpec struct { Ingress *JobManagerIngressSpec `json:"ingress,omitempty"` // Ports that JobManager listening on. + // +kubebuilder:default:={rpc:6123, blob:6124, query:6125, ui:8081} Ports JobManagerPorts `json:"ports,omitempty"` // _(Optional)_ Extra ports to be exposed. For example, Flink metrics reporter ports: Prometheus, JMX and so on. @@ -215,6 +232,7 @@ type JobManagerSpec struct { // default: 2 CPUs with 2Gi Memory. // It Cannot be updated. // [More info](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) + // +kubebuilder:default:={requests:{cpu:"200m", memory:"512Mi"}, limits: {cpu:2, memory:"2Gi"}} Resources corev1.ResourceRequirements `json:"resources,omitempty"` // Percentage of off-heap memory in containers, as a safety margin to avoid OOM kill, default: `25` @@ -288,12 +306,21 @@ type JobManagerSpec struct { // TaskManagerPorts defines ports of TaskManager. type TaskManagerPorts struct { // Data port, default: `6121`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=6121 Data *int32 `json:"data,omitempty"` // RPC port, default: `6122`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=6122 RPC *int32 `json:"rpc,omitempty"` // Query port, default: `6125`. + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:default:=6125 Query *int32 `json:"query,omitempty"` } @@ -313,12 +340,15 @@ const ( // TaskManagerSpec defines properties of TaskManager. type TaskManagerSpec struct { // _(Optional)_ Defines the replica workload's type: `StatefulSet` or `Deployment`. If not specified, the default value is `StatefulSet`. + // +kubebuilder:default:=StatefulSet DeploymentType DeploymentType `json:"deploymentType,omitempty"` // The number of replicas. default: `3` + // +kubebuilder:default:=3 Replicas *int32 `json:"replicas,omitempty"` // Ports that TaskManager listening on. + // +kubebuilder:default:={data:6121, rpc:6122, query:6125} Ports TaskManagerPorts `json:"ports,omitempty"` // _(Optional)_ Extra ports to be exposed. For example, Flink metrics reporter ports: Prometheus, JMX and so on. @@ -328,6 +358,7 @@ type TaskManagerSpec struct { // default: 2 CPUs with 2Gi Memory. // It Cannot be updated. // [More info](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) + // +kubebuilder:default:={requests:{cpu:"200m", memory:"512Mi"}, limits: {cpu:2, memory:"2Gi"}} Resources corev1.ResourceRequirements `json:"resources,omitempty"` // TODO: Memory calculation would be change. Let's watch the issue FLINK-13980. @@ -461,6 +492,7 @@ type JobSpec struct { FromSavepoint *string `json:"fromSavepoint,omitempty"` // Allow non-restored state, default: `false`. + // +kubebuilder:default:=false AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"` // _(Optional)_ Savepoints dir where to store savepoints of the job. @@ -485,9 +517,11 @@ type JobSpec struct { SavepointGeneration int32 `json:"savepointGeneration,omitempty"` // Job parallelism, default: `1`. + // +kubebuilder:default:=1 Parallelism *int32 `json:"parallelism,omitempty"` // No logging output to STDOUT, default: `false`. + // +kubebuilder:default:=false NoLoggingToStdout *bool `json:"noLoggingToStdout,omitempty"` // _(Optional)_ Volumes in the Job pod. @@ -521,9 +555,11 @@ type JobSpec struct { // job from the savepoint recorded in the job status if available; otherwise, // the job will stay in failed state. This option is usually used together // with `autoSavepointSeconds` and `savepointsDir`. - RestartPolicy *JobRestartPolicy `json:"restartPolicy"` + // +kubebuilder:default:=Never + RestartPolicy *JobRestartPolicy `json:"restartPolicy,omitempty"` // The action to take after job finishes. + // +kubebuilder:default:={afterJobSucceeds:DeleteCluster, afterJobFails:KeepCluster, afterJobCancelled:DeleteCluster} CleanupPolicy *CleanupPolicy `json:"cleanupPolicy,omitempty"` // Deprecated: _(Optional)_ Request the job to be cancelled. Only applies to running jobs. If @@ -543,6 +579,7 @@ type JobSpec struct { // If omitted, a default value will be used. // It Cannot be updated. // More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/ + // +kubebuilder:default:={requests:{cpu:"200m", memory:"512Mi"}, limits: {cpu:2, memory:"2Gi"}} Resources corev1.ResourceRequirements `json:"resources,omitempty"` // _(Optional)_ SecurityContext of the Job pod. @@ -554,6 +591,8 @@ type JobSpec struct { HostAliases []corev1.HostAlias `json:"hostAliases,omitempty"` // Job running mode, `"Blocking", "Detached"`, default: `"Detached"` + // +kubebuilder:validation:Enum=Detached;Blocking;Application + // +kubebuilder:default:=Detached Mode *JobMode `json:"mode,omitempty"` } @@ -596,9 +635,11 @@ type FlinkClusterSpec struct { BatchScheduler *BatchSchedulerSpec `json:"batchScheduler,omitempty"` // _(Optional)_ Flink JobManager spec. + // +kubebuilder:default:={replicas:1} JobManager *JobManagerSpec `json:"jobManager,omitempty"` // _(Optional)_ Flink TaskManager spec. + // +kubebuilder:default:={replicas:3} TaskManager *TaskManagerSpec `json:"taskManager,omitempty"` // _(Optional)_ Job spec. If specified, this cluster is an ephemeral Job @@ -637,6 +678,7 @@ type FlinkClusterSpec struct { RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"` // Recreate components when updating flinkcluster, default: true. + // +kubebuilder:default:=true RecreateOnUpdate *bool `json:"recreateOnUpdate,omitempty"` } @@ -648,6 +690,7 @@ type HadoopConfig struct { // The path where to mount the Volume of the ConfigMap. // default: `/etc/hadoop/conf`. + // +kubebuilder:default:=/etc/hadoop/conf MountPath string `json:"mountPath,omitempty"` } diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types_test.go b/apis/flinkcluster/v1beta1/flinkcluster_types_test.go index eedc6686..1aca76a4 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types_test.go @@ -17,61 +17,60 @@ limitations under the License. package v1beta1 import ( + "encoding/json" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/util/yaml" + "os" + "path/filepath" + "time" "golang.org/x/net/context" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) // These tests are written in BDD-style using Ginkgo framework. Refer to // http://onsi.github.io/ginkgo to learn more. -var _ = Describe("FlinkCluster", func() { - var ( - key types.NamespacedName - created, fetched *FlinkCluster - ) - - BeforeEach(func() { - // Add any setup steps that needs to be executed before each test - }) - - AfterEach(func() { - // Add any teardown steps that needs to be executed after each test - }) - - // Add Tests for OpenAPI validation (or additonal CRD features) specified in - // your API definition. - // Avoid adding tests for vanilla CRUD operations because they would - // test Kubernetes API server, which isn't the goal here. - Context("Create API", func() { +var _ = Describe("FlinkCluster type", func() { - It("should create an object successfully", func() { + // Define utility constants for object names and testing timeouts/durations and intervals. + const ( + Namespace = "default" + JobName = "state-machine" - key = types.NamespacedName{ - Name: "foo", - Namespace: "default", - } - created = &FlinkCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", - }} - - By("creating an API obj") - Expect(k8sClient.Create(context.TODO(), created)).To(Succeed()) - - fetched = &FlinkCluster{} - Expect(k8sClient.Get(context.TODO(), key, fetched)).To(Succeed()) - Expect(fetched).To(Equal(created)) + timeout = time.Second * 10 + duration = time.Second * 10 + interval = time.Millisecond * 250 + ) - By("deleting the created object") - Expect(k8sClient.Delete(context.TODO(), created)).To(Succeed()) - Expect(k8sClient.Get(context.TODO(), key, created)).ToNot(Succeed()) + var flinkJob FlinkCluster + flinkJobManifestPath, _ := filepath.Abs("assets/test/flinkcluster_type_test.yaml") + flinkJobManifest, _ := os.ReadFile(flinkJobManifestPath) + yaml.Unmarshal(flinkJobManifest, &flinkJob) + + Context("When creating FlinkCluster", func() { + It("Should set default values", func() { + By("By creating a new FlinkCluster") + ctx := context.Background() + + requested, _ := json.Marshal(flinkJob) + GinkgoWriter.Println("FlinkCluster requested: ", string(requested)) + Expect(k8sClient.Create(ctx, &flinkJob)).Should(Succeed()) + + jobLookupKey := types.NamespacedName{Name: JobName, Namespace: Namespace} + createdjob := &FlinkCluster{} + + // We'll need to retry getting this newly created FlinkCluster, given that creation may not immediately happen. + Eventually(func() bool { + err := k8sClient.Get(ctx, jobLookupKey, createdjob) + if err != nil { + return false + } + created, _ := json.Marshal(createdjob) + GinkgoWriter.Println("FlinkCluster created: ", string(created)) + return true + }, timeout, interval).Should(BeTrue()) }) - }) - }) diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index b8c73c33..0f6678f8 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -155,6 +155,7 @@ spec: configMapName: type: string mountPath: + default: /etc/hadoop/conf type: string type: object image: @@ -162,6 +163,7 @@ spec: name: type: string pullPolicy: + default: Always type: string pullSecrets: items: @@ -176,6 +178,7 @@ spec: job: properties: allowNonRestoredState: + default: false type: boolean args: items: @@ -193,6 +196,10 @@ spec: type: string type: array cleanupPolicy: + default: + afterJobCancelled: DeleteCluster + afterJobFails: KeepCluster + afterJobSucceeds: DeleteCluster properties: afterJobCancelled: type: string @@ -778,14 +785,21 @@ spec: minimum: 0 type: integer mode: + default: Detached + enum: + - Detached + - Blocking + - Application type: string noLoggingToStdout: + default: false type: boolean nodeSelector: additionalProperties: type: string type: object parallelism: + default: 1 format: int32 type: integer podAnnotations: @@ -803,6 +817,13 @@ spec: pyModule: type: string resources: + default: + limits: + cpu: 2 + memory: 2Gi + requests: + cpu: 200m + memory: 512Mi properties: limits: additionalProperties: @@ -822,6 +843,7 @@ spec: type: object type: object restartPolicy: + default: Never type: string savepointGeneration: format: int32 @@ -1627,10 +1649,10 @@ spec: - name type: object type: array - required: - - restartPolicy type: object jobManager: + default: + replicas: 1 properties: ServiceAnnotations: additionalProperties: @@ -1641,6 +1663,7 @@ spec: type: string type: object accessScope: + default: Cluster type: string extraPorts: items: @@ -1684,6 +1707,7 @@ spec: tlsSecretName: type: string useTls: + default: false type: boolean type: object initContainers: @@ -2336,18 +2360,35 @@ spec: type: string type: object ports: + default: + blob: 6124 + query: 6125 + rpc: 6123 + ui: 8081 properties: blob: + default: 6124 format: int32 + maximum: 65535 + minimum: 1 type: integer query: + default: 6125 format: int32 + maximum: 65535 + minimum: 1 type: integer rpc: + default: 6123 format: int32 + maximum: 65535 + minimum: 1 type: integer ui: + default: 8081 format: int32 + maximum: 65535 + minimum: 1 type: integer type: object readinessProbe: @@ -2419,9 +2460,17 @@ spec: type: integer type: object replicas: + default: 1 format: int32 type: integer resources: + default: + limits: + cpu: 2 + memory: 2Gi + requests: + cpu: 200m + memory: 512Mi properties: limits: additionalProperties: @@ -3946,6 +3995,7 @@ spec: type: string type: object recreateOnUpdate: + default: true type: boolean revisionHistoryLimit: format: int32 @@ -3953,8 +4003,11 @@ spec: serviceAccountName: type: string taskManager: + default: + replicas: 3 properties: deploymentType: + default: StatefulSet type: string extraPorts: items: @@ -4637,15 +4690,28 @@ spec: type: string type: object ports: + default: + data: 6121 + query: 6125 + rpc: 6122 properties: data: + default: 6121 format: int32 + maximum: 65535 + minimum: 1 type: integer query: + default: 6125 format: int32 + maximum: 65535 + minimum: 1 type: integer rpc: + default: 6122 format: int32 + maximum: 65535 + minimum: 1 type: integer type: object readinessProbe: @@ -4717,9 +4783,17 @@ spec: type: integer type: object replicas: + default: 3 format: int32 type: integer resources: + default: + limits: + cpu: 2 + memory: 2Gi + requests: + cpu: 200m + memory: 512Mi properties: limits: additionalProperties: diff --git a/controllers/flinkcluster/flinkcluster_util_test.go b/controllers/flinkcluster/flinkcluster_util_test.go index 9942d9b4..cb2ab2e2 100644 --- a/controllers/flinkcluster/flinkcluster_util_test.go +++ b/controllers/flinkcluster/flinkcluster_util_test.go @@ -117,10 +117,10 @@ func TestNewRevision(t *testing.T) { var revision, _ = newRevision(&flinkCluster, 1, &collisionCount) var expectedRevision = appsv1.ControllerRevision{ ObjectMeta: metav1.ObjectMeta{ - Name: "mycluster-7bc87c954f", + Name: "mycluster-fb7687bf5", Namespace: "default", Labels: map[string]string{ - "flinkoperator.k8s.io/hash": "7bc87c954f", + "flinkoperator.k8s.io/hash": "fb7687bf5", "flinkoperator.k8s.io/managed-by": "mycluster", }, Annotations: map[string]string{}, diff --git a/controllers/flinkcluster/suite_test.go b/controllers/flinkcluster/suite_test.go index 4bfbc6dd..56757523 100644 --- a/controllers/flinkcluster/suite_test.go +++ b/controllers/flinkcluster/suite_test.go @@ -17,7 +17,11 @@ limitations under the License. package flinkcluster import ( + "context" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "path/filepath" + ctrl "sigs.k8s.io/controller-runtime" "testing" . "github.com/onsi/ginkgo/v2" @@ -35,8 +39,13 @@ import ( // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var k8sClient client.Client -var testEnv *envtest.Environment +var ( + cfg *rest.Config + k8sClient client.Client // You'll be using this client in your tests. + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc +) func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -46,6 +55,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter))) + ctx, cancel = context.WithCancel(context.TODO()) By("bootstrapping test environment") testEnv = &envtest.Environment{ @@ -59,17 +69,35 @@ var _ = BeforeSuite(func() { err = v1beta1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) - err = v1beta1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - // +kubebuilder:scaffold:scheme k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).ToNot(HaveOccurred()) Expect(k8sClient).ToNot(BeNil()) + k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).ToNot(HaveOccurred()) + + cs, err := kubernetes.NewForConfig(k8sManager.GetConfig()) + Expect(err).ToNot(HaveOccurred()) + + err = (&FlinkClusterReconciler{ + Client: k8sManager.GetClient(), + Clientset: cs, + Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"), + }).SetupWithManager(k8sManager, 1) + Expect(err).ToNot(HaveOccurred()) + + go func() { + defer GinkgoRecover() + err = k8sManager.Start(ctx) + Expect(err).ToNot(HaveOccurred(), "failed to run manager") + }() }) var _ = AfterSuite(func() { + cancel() By("tearing down the test environment") err := testEnv.Stop() Expect(err).ToNot(HaveOccurred())