diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default.go b/apis/flinkcluster/v1beta1/flinkcluster_default.go index 1c538c28..7ac0956e 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default.go @@ -232,6 +232,10 @@ func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Versi mergo.Merge(&readinessProbe, tmSpec.ReadinessProbe, mergo.WithOverride) } tmSpec.ReadinessProbe = &readinessProbe + + if tmSpec.DeploymentType == "" { + tmSpec.DeploymentType = DeploymentTypeStatefulSet + } } func _SetJobDefault(jobSpec *JobSpec) { diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default_test.go b/apis/flinkcluster/v1beta1/flinkcluster_default_test.go index 40cd5184..72ef43dd 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/intstr" @@ -135,7 +136,8 @@ func TestSetDefault(t *testing.T) { ReadinessProbe: &defaultJmReadinessProbe, }, TaskManager: &TaskManagerSpec{ - Replicas: &tmReplicas, + Replicas: &tmReplicas, + DeploymentType: DeploymentTypeStatefulSet, Ports: TaskManagerPorts{ Data: &defaultTmDataPort, RPC: &defaultTmRPCPort, @@ -290,7 +292,8 @@ func TestSetNonDefault(t *testing.T) { ReadinessProbe: &jmReadinessProbe, }, TaskManager: &TaskManagerSpec{ - Replicas: &tmReplicas, + Replicas: &tmReplicas, + DeploymentType: DeploymentTypeDeployment, Ports: TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, @@ -379,7 +382,8 @@ func TestSetNonDefault(t *testing.T) { ReadinessProbe: &jmExpectedReadinessProbe, }, TaskManager: &TaskManagerSpec{ - Replicas: &tmReplicas, + Replicas: &tmReplicas, + DeploymentType: DeploymentTypeDeployment, Ports: TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types.go b/apis/flinkcluster/v1beta1/flinkcluster_types.go index a58d4aef..88723582 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types.go @@ -293,8 +293,24 @@ type TaskManagerPorts struct { Query *int32 `json:"query,omitempty"` } +// K8s workload API kind for TaskManager workers +type DeploymentType string + +const ( + // This refers to the Kubernetes Type [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset) + // Use persistent volumes for recovery. + DeploymentTypeStatefulSet = "StatefulSet" + + // This refers to the Kubernetes Type [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment) + // Faster startup, but the volumes are ephemeral + DeploymentTypeDeployment = "Deployment" +) + // TaskManagerSpec defines properties of TaskManager. type TaskManagerSpec struct { + // _(Optional)_ Defines the replica workload's type. If not specified, the default value is `StatefulSet`. + DeploymentType DeploymentType `json:"deploymentType,omitempty"` + // The number of replicas. default: `3` Replicas *int32 `json:"replicas,omitempty"` @@ -334,6 +350,12 @@ type TaskManagerSpec struct { // _(Optional)_ A template for persistent volume claim each requested and mounted to JobManager pod, // This can be used to mount an external volume with a specific storageClass or larger captivity (for larger/faster state backend). // [More info](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims) + + // If deploymentType: StatefulSet is used, these templates will be added to the taskManager statefulset template, + // hence mounting persistent-pvcs to the indexed statefulset pods. + // + // If deploymentType: Deployment is used, these templates are appended to the Ephemeral Volumes in the PodSpec, + // hence mounting ephemeral-pvcs to the replicaset pods. VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"` // _(Optional)_ Init containers of the Task Manager pod. @@ -664,7 +686,10 @@ type FlinkClusterComponentsStatus struct { JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"` // The state of TaskManager StatefulSet. - TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet"` + TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet,omitempty"` + + // The state of TaskManager Deployment. + TaskManagerDeployment FlinkClusterComponentState `json:"taskManagerDeployment,omitempty"` // The status of the job, available only when JobSpec is provided. Job *JobStatus `json:"job,omitempty"` diff --git a/apis/flinkcluster/v1beta1/flinkcluster_validate.go b/apis/flinkcluster/v1beta1/flinkcluster_validate.go index 2d629c49..6a3489f7 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_validate.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_validate.go @@ -118,6 +118,11 @@ func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error { return nil } + err = v.validateTaskManagerUpdate(old, new) + if err != nil { + return err + } + err = v.validateJobUpdate(old, new) if err != nil { return err @@ -225,6 +230,14 @@ func (v *Validator) checkSavepointGeneration( "you cannot update savepointGeneration with others at the same time") } +func (v *Validator) validateTaskManagerUpdate(old *FlinkCluster, new *FlinkCluster) error { + if old.Spec.TaskManager.DeploymentType != new.Spec.TaskManager.DeploymentType { + return fmt.Errorf( + "updating deploymentType is not allowed") + } + return nil +} + // Validate job update. func (v *Validator) validateJobUpdate(old *FlinkCluster, new *FlinkCluster) error { switch { diff --git a/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go b/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go index 756655d3..ded952ac 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_validate_test.go @@ -615,6 +615,16 @@ func TestUpdateSavepointGeneration(t *testing.T) { assert.Equal(t, err, nil) } +func TestTaskManagerDeploymentTypeUpdate(t *testing.T) { + // cannot update deploymentType + var oldCluster = getSimpleFlinkCluster() + var newCluster = getSimpleFlinkCluster() + newCluster.Spec.TaskManager.DeploymentType = DeploymentTypeDeployment + err := validator.ValidateUpdate(&oldCluster, &newCluster) + expectedErr := "updating deploymentType is not allowed" + assert.Equal(t, err.Error(), expectedErr) +} + func TestUpdateJob(t *testing.T) { var validator = &Validator{} var tc = &TimeConverter{} diff --git a/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go b/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go index a43da436..1f1990c6 100644 --- a/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go +++ b/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go @@ -110,6 +110,7 @@ func (in *FlinkClusterComponentsStatus) DeepCopyInto(out *FlinkClusterComponents (*in).DeepCopyInto(*out) } out.TaskManagerStatefulSet = in.TaskManagerStatefulSet + out.TaskManagerDeployment = in.TaskManagerDeployment if in.Job != nil { in, out := &in.Job, &out.Job *out = new(JobStatus) diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index 28ca3795..713b04e9 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -3932,6 +3932,8 @@ spec: type: string taskManager: properties: + deploymentType: + type: string extraPorts: items: properties: @@ -6325,6 +6327,16 @@ spec: - name - state type: object + taskManagerDeployment: + properties: + name: + type: string + state: + type: string + required: + - name + - state + type: object taskManagerStatefulSet: properties: name: @@ -6339,7 +6351,6 @@ spec: - configMap - jobManagerService - jobManagerStatefulSet - - taskManagerStatefulSet type: object control: properties: diff --git a/controllers/flinkcluster/flinkcluster_controller.go b/controllers/flinkcluster/flinkcluster_controller.go index 5de29f9e..90b06b55 100644 --- a/controllers/flinkcluster/flinkcluster_controller.go +++ b/controllers/flinkcluster/flinkcluster_controller.go @@ -213,9 +213,12 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context, } if desired.TmStatefulSet != nil { log.Info("Desired state", "TaskManager StatefulSet", *desired.TmStatefulSet) + } else if desired.TmDeployment != nil { + log.Info("Desired state", "TaskManager Deployment", *desired.TmDeployment) } else { - log.Info("Desired state", "TaskManager StatefulSet", "nil") + log.Info("Desired state", "TaskManager", "nil") } + if desired.Job != nil { log.Info("Desired state", "Job", *desired.Job) } else { diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index 56da2c4d..e23f3047 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -98,8 +98,14 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste state.JmStatefulSet = newJobManagerStatefulSet(cluster) } - if !shouldCleanup(cluster, "TaskManagerStatefulSet") { - state.TmStatefulSet = newTaskManagerStatefulSet(cluster) + if cluster.Spec.TaskManager.DeploymentType == v1beta1.DeploymentTypeStatefulSet { + if !shouldCleanup(cluster, "TaskManagerStatefulSet") { + state.TmStatefulSet = newTaskManagerStatefulSet(cluster) + } + } else { + if !shouldCleanup(cluster, "TaskManagerDeployment") { + state.TmDeployment = newTaskManagerDeployment(cluster) + } } if !shouldCleanup(cluster, "TaskManagerService") { @@ -406,7 +412,7 @@ func newJobManagerIngress( return jobManagerIngress } -func newTaskMangerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container { +func newTaskManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container { var imageSpec = flinkCluster.Spec.Image var taskManagerSpec = flinkCluster.Spec.TaskManager var dataPort = corev1.ContainerPort{Name: "data", ContainerPort: *taskManagerSpec.Ports.Data} @@ -498,7 +504,7 @@ func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.State podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels) var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) - mainContainer := newTaskMangerContainer(flinkCluster) + mainContainer := newTaskManagerContainer(flinkCluster) podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster) var pvcs []corev1.PersistentVolumeClaim @@ -534,6 +540,58 @@ func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.State } } +func getEphemeralVolumesFromTaskManagerSpec(flinkCluster *v1beta1.FlinkCluster) []corev1.Volume { + var ephemeralVolumes []corev1.Volume + var volumeClaimsInSpec = flinkCluster.Spec.TaskManager.VolumeClaimTemplates + for _, volume := range volumeClaimsInSpec { + ephemeralVolumes = append(ephemeralVolumes, corev1.Volume{ + Name: volume.ObjectMeta.Name, + // Ephemeral volume + VolumeSource: corev1.VolumeSource{ + Ephemeral: &corev1.EphemeralVolumeSource{ + VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{ + Spec: volume.Spec, + }, + }, + }, + }) + } + return ephemeralVolumes +} + +// Gets the desired TaskManager Deployment spec from a cluster spec. +func newTaskManagerDeployment(flinkCluster *v1beta1.FlinkCluster) *appsv1.Deployment { + var taskManagerSpec = flinkCluster.Spec.TaskManager + var taskManagerDeploymentName = getTaskManagerDeploymentName(flinkCluster.Name) + var podLabels = getComponentLabels(flinkCluster, "taskmanager") + podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels) + var deploymentLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) + + mainContainer := newTaskManagerContainer(flinkCluster) + podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster) + podSpec.Volumes = append(podSpec.Volumes, getEphemeralVolumesFromTaskManagerSpec(flinkCluster)...) + + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: flinkCluster.Namespace, + Name: taskManagerDeploymentName, + OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, + Labels: deploymentLabels, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: taskManagerSpec.Replicas, + Selector: &metav1.LabelSelector{MatchLabels: podLabels}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: podLabels, + Annotations: taskManagerSpec.PodAnnotations, + }, + Spec: *podSpec, + }, + }, + } +} + // Gets the desired PodDisruptionBudget. func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDisruptionBudget { var jobSpec = flinkCluster.Spec.Job @@ -1015,7 +1073,7 @@ func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool { case v1beta1.CleanupActionDeleteCluster: return true case v1beta1.CleanupActionDeleteTaskManager: - return component == "TaskManagerStatefulSet" + return component == "TaskManagerStatefulSet" || component == "TaskManagerDeployment" } return false diff --git a/controllers/flinkcluster/flinkcluster_converter_test.go b/controllers/flinkcluster/flinkcluster_converter_test.go index 13a0164f..771c8c75 100644 --- a/controllers/flinkcluster/flinkcluster_converter_test.go +++ b/controllers/flinkcluster/flinkcluster_converter_test.go @@ -33,31 +33,34 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) -func TestGetDesiredClusterState(t *testing.T) { - var controller = true - var blockOwnerDeletion = false - var parallelism int32 = 2 - var jmRPCPort int32 = 6123 - var jmBlobPort int32 = 6124 - var jmQueryPort int32 = 6125 - var jmUIPort int32 = 8081 - var useTLS = true - var tmDataPort int32 = 6121 - var tmRPCPort int32 = 6122 - var tmQueryPort int32 = 6125 - var replicas int32 = 42 - var tolerationSeconds int64 = 30 - var restartPolicy = v1beta1.JobRestartPolicyFromSavepointOnFailure - var className = "org.apache.flink.examples.java.wordcount.WordCount" - var serviceAccount = "default" - var jarFile = "/cache/my-job.jar" - var hostFormat = "{{$clusterName}}.example.com" - var memoryOffHeapRatio int32 = 25 - var memoryOffHeapMin = resource.MustParse("600M") - var memoryProcessRatio int32 = 80 - var jobMode v1beta1.JobMode = v1beta1.JobModeDetached - var jobBackoffLimit int32 = 0 - var jmReadinessProbe = corev1.Probe{ +// Defaults used in the upcoming tests +var ( + controller = true + blockOwnerDeletion = false + parallelism int32 = 2 + jmRPCPort int32 = 6123 + jmBlobPort int32 = 6124 + jmQueryPort int32 = 6125 + jmUIPort int32 = 8081 + useTLS = true + tmDataPort int32 = 6121 + tmRPCPort int32 = 6122 + tmQueryPort int32 = 6125 + replicas int32 = 42 + tolerationSeconds int64 = 30 + restartPolicy = v1beta1.JobRestartPolicyFromSavepointOnFailure + className = "org.apache.flink.examples.java.wordcount.WordCount" + serviceAccount = "default" + jarFile = "/cache/my-job.jar" + hostFormat = "{{$clusterName}}.example.com" + memoryOffHeapRatio int32 = 25 + memoryOffHeapMin = resource.MustParse("600M") + memoryProcessRatio int32 = 80 + jobMode v1beta1.JobMode = v1beta1.JobModeDetached + jobBackoffLimit int32 = 0 + ingressPathType = networkingv1.PathTypePrefix + storageClassName = "default-class" + jmReadinessProbe = corev1.Probe{ Handler: corev1.Handler{ TCPSocket: &corev1.TCPSocketAction{ Port: intstr.FromInt(int(jmRPCPort)), @@ -68,7 +71,7 @@ func TestGetDesiredClusterState(t *testing.T) { PeriodSeconds: 5, FailureThreshold: 60, } - var jmLivenessProbe = corev1.Probe{ + jmLivenessProbe = corev1.Probe{ Handler: corev1.Handler{ TCPSocket: &corev1.TCPSocketAction{ Port: intstr.FromInt(int(jmRPCPort)), @@ -79,7 +82,7 @@ func TestGetDesiredClusterState(t *testing.T) { PeriodSeconds: 60, FailureThreshold: 5, } - var tmReadinessProbe = corev1.Probe{ + tmReadinessProbe = corev1.Probe{ Handler: corev1.Handler{ TCPSocket: &corev1.TCPSocketAction{ Port: intstr.FromInt(int(tmRPCPort)), @@ -90,7 +93,7 @@ func TestGetDesiredClusterState(t *testing.T) { PeriodSeconds: 5, FailureThreshold: 60, } - var tmLivenessProbe = corev1.Probe{ + tmLivenessProbe = corev1.Probe{ Handler: corev1.Handler{ TCPSocket: &corev1.TCPSocketAction{ Port: intstr.FromInt(int(tmRPCPort)), @@ -101,7 +104,7 @@ func TestGetDesiredClusterState(t *testing.T) { PeriodSeconds: 60, FailureThreshold: 5, } - var tolerations = []corev1.Toleration{ + tolerations = []corev1.Toleration{ { Key: "toleration-key", Effect: "toleration-effect", @@ -117,16 +120,16 @@ func TestGetDesiredClusterState(t *testing.T) { Value: "toleration-value2", }, } - var userAndGroupId int64 = 9999 - var securityContext = corev1.PodSecurityContext{ + userAndGroupId int64 = 9999 + securityContext = corev1.PodSecurityContext{ RunAsUser: &userAndGroupId, RunAsGroup: &userAndGroupId, } - var ingressPathType = networkingv1.PathTypePrefix +) - // Setup. - storageClassName := "default-class" - var observed = &ObservedClusterState{ +func getObservedClusterState() *ObservedClusterState { + + return &ObservedClusterState{ cluster: &v1beta1.FlinkCluster{ TypeMeta: metav1.TypeMeta{ Kind: "FlinkCluster", @@ -221,7 +224,8 @@ func TestGetDesiredClusterState(t *testing.T) { SecurityContext: &securityContext, }, TaskManager: &v1beta1.TaskManagerSpec{ - Replicas: &replicas, + DeploymentType: v1beta1.DeploymentTypeStatefulSet, + Replicas: &replicas, Ports: v1beta1.TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, @@ -313,6 +317,12 @@ func TestGetDesiredClusterState(t *testing.T) { }, }, } +} + +func TestGetDesiredClusterState(t *testing.T) { + + // Setup. + var observed = getObservedClusterState() // Run. var desiredState = getDesiredClusterState(observed) @@ -1022,6 +1032,216 @@ taskmanager.rpc.port: 6122 expectedConfigMap) } +func TestTmDeploymentTypeDeployment(t *testing.T) { + var observed *ObservedClusterState = getObservedClusterState() + observed.cluster.Spec.TaskManager.DeploymentType = v1beta1.DeploymentTypeDeployment + + var desired = getDesiredClusterState(observed) + + assert.Assert(t, desired.TmStatefulSet == nil) + + var expectedDesiredTmDeployment = appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "flinkjobcluster-sample-taskmanager", + Namespace: "default", + Labels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + RevisionNameLabel: "flinkjobcluster-sample-85dc8f749", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "flinkoperator.k8s.io/v1beta1", + Kind: "FlinkCluster", + Name: "flinkjobcluster-sample", + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + }, + Annotations: map[string]string{ + "example.com": "example", + }, + }, + Spec: corev1.PodSpec{ + InitContainers: make([]corev1.Container, 0), + Containers: []corev1.Container{ + { + Name: "taskmanager", + Image: "flink:1.8.1", + Args: []string{"taskmanager"}, + Ports: []corev1.ContainerPort{ + {Name: "data", ContainerPort: 6121}, + {Name: "rpc", ContainerPort: 6122}, + {Name: "query", ContainerPort: 6125}, + }, + LivenessProbe: &tmLivenessProbe, + ReadinessProbe: &tmReadinessProbe, + Env: []corev1.EnvVar{ + { + Name: "TASK_MANAGER_CPU_LIMIT", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: "taskmanager", + Resource: "limits.cpu", + Divisor: resource.MustParse("1m"), + }, + }, + }, + { + Name: "TASK_MANAGER_MEMORY_LIMIT", + ValueFrom: &corev1.EnvVarSource{ + ResourceFieldRef: &corev1.ResourceFieldSelector{ + ContainerName: "taskmanager", + Resource: "limits.memory", + Divisor: resource.MustParse("1Mi"), + }, + }, + }, + { + Name: "FOO", + Value: "abc", + }, + {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, + { + Name: "GOOGLE_APPLICATION_CREDENTIALS", + Value: "/etc/gcp_service_account/gcp_service_account_key.json", + }, + }, + EnvFrom: []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "FOOMAP", + }, + }, + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("200m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + }, + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "cache-volume", MountPath: "/cache"}, + {Name: "flink-config-volume", MountPath: "/opt/flink/conf"}, + { + Name: "hadoop-config-volume", + MountPath: "/etc/hadoop/conf", + ReadOnly: true, + }, + { + Name: "gcp-service-account-volume", + MountPath: "/etc/gcp_service_account/", + ReadOnly: true, + }, + }, + Lifecycle: &corev1.Lifecycle{ + PreStop: &corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"sleep", strconv.Itoa(preStopSleepSeconds)}, + }, + }, + }, + }, + {Name: "sidecar", Image: "alpine"}, + }, + Volumes: []corev1.Volume{ + { + Name: "cache-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "flink-config-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "flinkjobcluster-sample-configmap", + }, + }, + }, + }, + { + Name: "hadoop-config-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "hadoop-configmap", + }, + }, + }, + }, + { + Name: "gcp-service-account-volume", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "gcp-service-account-secret", + }, + }, + }, + { + Name: "pvc-test", + VolumeSource: corev1.VolumeSource{ + Ephemeral: &corev1.EphemeralVolumeSource{ + VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{ + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("100Gi"), + }, + }, + StorageClassName: &storageClassName, + }, + }, + }, + }, + }, + }, + TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, + Tolerations: tolerations, + SecurityContext: &corev1.PodSecurityContext{ + RunAsUser: &userAndGroupId, + RunAsGroup: &userAndGroupId, + }, + ServiceAccountName: serviceAccount, + }, + }, + }, + } + + assert.Assert(t, desired.TmDeployment != nil) + assert.DeepEqual( + t, + *desired.TmDeployment, + expectedDesiredTmDeployment, + cmpopts.IgnoreUnexported(resource.Quantity{})) +} + func TestSecurityContext(t *testing.T) { var jmRPCPort int32 = 6123 var jmBlobPort int32 = 6124 @@ -1062,7 +1282,8 @@ func TestSecurityContext(t *testing.T) { SecurityContext: &securityContext, }, TaskManager: &v1beta1.TaskManagerSpec{ - Replicas: &tmReplicas, + Replicas: &tmReplicas, + DeploymentType: v1beta1.DeploymentTypeStatefulSet, Ports: v1beta1.TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, @@ -1102,7 +1323,8 @@ func TestSecurityContext(t *testing.T) { }, }, TaskManager: &v1beta1.TaskManagerSpec{ - Replicas: &tmReplicas, + Replicas: &tmReplicas, + DeploymentType: v1beta1.DeploymentTypeStatefulSet, Ports: v1beta1.TaskManagerPorts{ Data: &tmDataPort, RPC: &tmRPCPort, diff --git a/controllers/flinkcluster/flinkcluster_observer.go b/controllers/flinkcluster/flinkcluster_observer.go index ba736e88..4477ca4e 100644 --- a/controllers/flinkcluster/flinkcluster_observer.go +++ b/controllers/flinkcluster/flinkcluster_observer.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-logr/logr" + v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/controllers/history" flink "github.com/spotify/flink-on-k8s-operator/internal/flink" @@ -59,6 +60,7 @@ type ObservedClusterState struct { jmService *corev1.Service jmIngress *networkingv1.Ingress tmStatefulSet *appsv1.StatefulSet + tmDeployment *appsv1.Deployment tmService *corev1.Service podDisruptionBudget *policyv1.PodDisruptionBudget persistentVolumeClaims *corev1.PersistentVolumeClaimList @@ -238,7 +240,7 @@ func (observer *ClusterStateObserver) observe( observed.jmIngress = observedJmIngress } - // TaskManager StatefulSet. + // TaskManager StatefulSet var observedTmStatefulSet = new(appsv1.StatefulSet) err = observer.observeTaskManagerStatefulSet(observedTmStatefulSet) if err != nil { @@ -250,8 +252,23 @@ func (observer *ClusterStateObserver) observe( observedTmStatefulSet = nil } else { log.Info("Observed TaskManager StatefulSet", "state", *observedTmStatefulSet) - observed.tmStatefulSet = observedTmStatefulSet } + observed.tmStatefulSet = observedTmStatefulSet + + // TaskManager Deployment + var observedTmDeployment = new(appsv1.Deployment) + err = observer.observeTaskManagerDeployment(observedTmDeployment) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get TaskManager Deployment") + return err + } + log.Info("Observed TaskManager Deployment", "state", "nil") + observedTmDeployment = nil + } else { + log.Info("Observed TaskManager Deployment", "state", *observedTmDeployment) + } + observed.tmDeployment = observedTmDeployment // TaskManager Service. var observedTmSvc = new(corev1.Service) @@ -510,6 +527,15 @@ func (observer *ClusterStateObserver) observeTaskManagerStatefulSet( clusterNamespace, tmStatefulSetName, "TaskManager", observedStatefulSet) } +func (observer *ClusterStateObserver) observeTaskManagerDeployment( + observedDeployment *appsv1.Deployment) error { + var clusterNamespace = observer.request.Namespace + var clusterName = observer.request.Name + var tmDeploymentName = getTaskManagerDeploymentName(clusterName) + return observer.observeDeployment( + clusterNamespace, tmDeploymentName, "TaskManager", observedDeployment) +} + func (observer *ClusterStateObserver) observeTaskManagerService( observedSvc *corev1.Service) error { var clusterNamespace = observer.request.Namespace @@ -547,6 +573,30 @@ func (observer *ClusterStateObserver) observeStatefulSet( return err } +// observe deployment +func (observer *ClusterStateObserver) observeDeployment( + namespace string, + name string, + component string, + observedDeployment *appsv1.Deployment) error { + var log = observer.log.WithValues("component", component) + var err = observer.k8sClient.Get( + observer.context, + types.NamespacedName{ + Namespace: namespace, + Name: name, + }, + observedDeployment) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get Deployment") + } else { + log.Info("Deployment not found") + } + } + return err +} + func (observer *ClusterStateObserver) observeJobManagerService( observedService *corev1.Service) error { var clusterNamespace = observer.request.Namespace diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index bb56985b..4dafc0a2 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -110,6 +110,12 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { if err != nil { return ctrl.Result{}, err } + + err = reconciler.reconcileTaskManagerDeployment() + if err != nil { + return ctrl.Result{}, err + } + err = reconciler.reconcileTaskManagerService() if err != nil { return ctrl.Result{}, err @@ -169,6 +175,13 @@ func (reconciler *ClusterReconciler) reconcileTaskManagerStatefulSet() error { reconciler.observed.tmStatefulSet) } +func (reconciler *ClusterReconciler) reconcileTaskManagerDeployment() error { + return reconciler.reconcileDeployment( + "TaskManager", + reconciler.desired.TmDeployment, + reconciler.observed.tmDeployment) +} + func (reconciler *ClusterReconciler) reconcileStatefulSet( component string, desiredStatefulSet *appsv1.StatefulSet, @@ -205,6 +218,42 @@ func (reconciler *ClusterReconciler) reconcileStatefulSet( return nil } +func (reconciler *ClusterReconciler) reconcileDeployment( + component string, + desiredDeployment *appsv1.Deployment, + observedDeployment *appsv1.Deployment) error { + var log = reconciler.log.WithValues("component", component) + + if desiredDeployment != nil && observedDeployment == nil { + return reconciler.createDeployment(desiredDeployment, component) + } + + if desiredDeployment != nil && observedDeployment != nil { + var cluster = reconciler.observed.cluster + if shouldUpdateCluster(&reconciler.observed) && !isComponentUpdated(observedDeployment, cluster) { + updateComponent := fmt.Sprintf("%v Deployment", component) + var err error + if *reconciler.observed.cluster.Spec.RecreateOnUpdate { + err = reconciler.deleteOldComponent(desiredDeployment, observedDeployment, updateComponent) + } else { + err = reconciler.updateComponent(desiredDeployment, updateComponent) + } + if err != nil { + return err + } + return nil + } + log.Info("Deployment already exists, no action") + return nil + } + + if desiredDeployment == nil && observedDeployment != nil { + return reconciler.deleteDeployment(observedDeployment, component) + } + + return nil +} + func (reconciler *ClusterReconciler) reconcileTaskManagerService() error { var desiredSvc = reconciler.desired.TmService var observedSvc = reconciler.observed.tmService @@ -269,6 +318,22 @@ func (reconciler *ClusterReconciler) createStatefulSet( return err } +func (reconciler *ClusterReconciler) createDeployment( + deployment *appsv1.Deployment, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Creating Deployment", "Deployment", *deployment) + var err = k8sClient.Create(context, deployment) + if err != nil { + log.Error(err, "Failed to create Deployment") + } else { + log.Info("Deployment created") + } + return err +} + func (reconciler *ClusterReconciler) deleteOldComponent(desired client.Object, observed runtime.Object, component string) error { var log = reconciler.log.WithValues("component", component) if isComponentUpdated(observed, reconciler.observed.cluster) { @@ -320,6 +385,23 @@ func (reconciler *ClusterReconciler) deleteStatefulSet( return err } +func (reconciler *ClusterReconciler) deleteDeployment( + deployment *appsv1.Deployment, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Deleting Deployment", "Deployment", deployment) + var err = k8sClient.Delete(context, deployment) + err = client.IgnoreNotFound(err) + if err != nil { + log.Error(err, "Failed to delete Deployment") + } else { + log.Info("Deployment deleted") + } + return err +} + func (reconciler *ClusterReconciler) reconcileJobManagerService() error { var desiredJmService = reconciler.desired.JmService var observedJmService = reconciler.observed.jmService diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 924cb6ff..663808f4 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -129,7 +129,7 @@ func (updater *ClusterStatusUpdater) createStatusChangeEvents( newStatus.Components.JobManagerIngress.State) } - // TaskManager. + // TaskManager Statefulset. if oldStatus.Components.TaskManagerStatefulSet.State != newStatus.Components.TaskManagerStatefulSet.State { updater.createStatusChangeEvent( @@ -138,6 +138,15 @@ func (updater *ClusterStatusUpdater) createStatusChangeEvents( newStatus.Components.TaskManagerStatefulSet.State) } + // TaskManager Deployment. + if oldStatus.Components.TaskManagerDeployment.State != + newStatus.Components.TaskManagerDeployment.State { + updater.createStatusChangeEvent( + "TaskManager Deployment", + oldStatus.Components.TaskManagerDeployment.State, + newStatus.Components.TaskManagerDeployment.State) + } + // Job. if oldStatus.Components.Job == nil && newStatus.Components.Job != nil { updater.createStatusChangeEvent( @@ -392,6 +401,28 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus( } } + // TaskManager Deployment. + var observedTmDeployment = observed.tmDeployment + if !isComponentUpdated(observedTmDeployment, observed.cluster) && shouldUpdateCluster(observed) { + recorded.Components.TaskManagerDeployment.DeepCopyInto(&status.Components.TaskManagerDeployment) + status.Components.TaskManagerDeployment.State = v1beta1.ComponentStateUpdating + } else if observedTmDeployment != nil { + status.Components.TaskManagerDeployment.Name = + observedTmDeployment.Name + status.Components.TaskManagerDeployment.State = + getDeploymentState(observedTmDeployment) + if status.Components.TaskManagerDeployment.State == + v1beta1.ComponentStateReady { + runningComponents++ + } + } else if recorded.Components.TaskManagerDeployment.Name != "" { + status.Components.TaskManagerDeployment = + v1beta1.FlinkClusterComponentState{ + Name: recorded.Components.TaskManagerDeployment.Name, + State: v1beta1.ComponentStateDeleted, + } + } + // Derive the new cluster state. var jobStatus = recorded.Components.Job switch recorded.State { @@ -763,6 +794,16 @@ func (updater *ClusterStatusUpdater) isStatusChanged( newStatus.Components.TaskManagerStatefulSet) changed = true } + if newStatus.Components.TaskManagerDeployment != + currentStatus.Components.TaskManagerDeployment { + updater.log.Info( + "TaskManager Deployment status changed", + "current", + currentStatus.Components.TaskManagerDeployment, + "new", + newStatus.Components.TaskManagerDeployment) + changed = true + } if currentStatus.Components.Job == nil { if newStatus.Components.Job != nil { updater.log.Info( @@ -1007,3 +1048,10 @@ func getStatefulSetState(statefulSet *appsv1.StatefulSet) string { } return v1beta1.ComponentStateNotReady } + +func getDeploymentState(deployment *appsv1.Deployment) string { + if deployment.Status.ReadyReplicas >= *deployment.Spec.Replicas { + return v1beta1.ComponentStateReady + } + return v1beta1.ComponentStateNotReady +} diff --git a/controllers/flinkcluster/flinkcluster_updater_test.go b/controllers/flinkcluster/flinkcluster_updater_test.go index 8c11500c..f8fc0d59 100644 --- a/controllers/flinkcluster/flinkcluster_updater_test.go +++ b/controllers/flinkcluster/flinkcluster_updater_test.go @@ -209,7 +209,7 @@ func TestClusterStatus(t *testing.T) { } var updater = &ClusterStatusUpdater{log: log.Log, observed: observed} - cluster := v1beta1.FlinkCluster{Status: oldStatus} + cluster := v1beta1.FlinkCluster{Status: oldStatus, Spec: v1beta1.FlinkClusterSpec{TaskManager: &v1beta1.TaskManagerSpec{DeploymentType: v1beta1.DeploymentTypeStatefulSet}}} newStatus := updater.deriveClusterStatus(&cluster, &observed) assert.Assert(t, updater.isStatusChanged(oldStatus, newStatus)) assert.Equal(t, newStatus.State, v1beta1.ClusterStateRunning) diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index 61ed9824..ee3e6400 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -122,6 +122,11 @@ func getTaskManagerStatefulSetName(clusterName string) string { return clusterName + "-taskmanager" } +// Gets TaskManager Deployment name +func getTaskManagerDeploymentName(clusterName string) string { + return clusterName + "-taskmanager" +} + func getJobManagerJobName(clusterName string) string { return clusterName + "-jobmanager" } diff --git a/internal/batchscheduler/volcano/volcano.go b/internal/batchscheduler/volcano/volcano.go index 3904a2eb..01649fc9 100644 --- a/internal/batchscheduler/volcano/volcano.go +++ b/internal/batchscheduler/volcano/volcano.go @@ -93,7 +93,10 @@ func (v *VolcanoBatchScheduler) setSchedulerMeta(pg *scheduling.PodGroup, state if state.TmStatefulSet != nil { setMeta(&state.TmStatefulSet.Spec.Template) + } else if state.TmDeployment != nil { + setMeta(&state.TmDeployment.Spec.Template) } + if state.JmStatefulSet != nil { setMeta(&state.JmStatefulSet.Spec.Template) } @@ -136,7 +139,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup( podGroupName := fmt.Sprintf(podGroupNameFormat, options.ClusterName) namespace := options.ClusterNamespace - if state.JmStatefulSet == nil && state.TmStatefulSet == nil { + if state.JmStatefulSet == nil && (state.TmStatefulSet == nil && state.TmDeployment == nil) { // remove the podgroup if the JobManager/TaskManager statefulset are not set err := v.deletePodGroup(podGroupName, namespace) if !errors.IsNotFound(err) { @@ -195,6 +198,11 @@ func getClusterResource(state *model.DesiredClusterState) (*corev1.ResourceRequi size += *spec.Replicas resources := getStatefulSetResources(&spec) addResourceRequirements(reqs, resources) + } else if state.TmDeployment != nil { + spec := state.TmDeployment.Spec + size += *spec.Replicas + resources := getDeploymentResources(&spec) + addResourceRequirements(reqs, resources) } if state.Job != nil { @@ -224,6 +232,19 @@ func getStatefulSetResources(spec *appsv1.StatefulSetSpec) *corev1.ResourceRequi return reqs } +func getDeploymentResources(spec *appsv1.DeploymentSpec) *corev1.ResourceRequirements { + reqs := &corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{}, + Requests: map[corev1.ResourceName]resource.Quantity{}, + } + + for i := int32(0); i < *spec.Replicas; i++ { + tmResource := getPodResource(&spec.Template.Spec) + addResourceRequirements(reqs, tmResource) + } + return reqs +} + func getPodResource(spec *corev1.PodSpec) *corev1.ResourceRequirements { reqs := &corev1.ResourceRequirements{ Limits: map[corev1.ResourceName]resource.Quantity{}, diff --git a/internal/model/model.go b/internal/model/model.go index 4745e7e9..12b62f0a 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -27,6 +27,7 @@ type DesiredClusterState struct { JmService *corev1.Service JmIngress *networkingv1.Ingress TmStatefulSet *appsv1.StatefulSet + TmDeployment *appsv1.Deployment TmService *corev1.Service ConfigMap *corev1.ConfigMap Job *batchv1.Job