Skip to content

Commit

Permalink
Add TaskManager deployment through Deployment resource (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
live-wire authored Jun 2, 2022
1 parent a0eb731 commit ee9d70a
Show file tree
Hide file tree
Showing 17 changed files with 612 additions and 54 deletions.
4 changes: 4 additions & 0 deletions apis/flinkcluster/v1beta1/flinkcluster_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Versi
mergo.Merge(&readinessProbe, tmSpec.ReadinessProbe, mergo.WithOverride)
}
tmSpec.ReadinessProbe = &readinessProbe

if tmSpec.DeploymentType == "" {
tmSpec.DeploymentType = DeploymentTypeStatefulSet
}
}

func _SetJobDefault(jobSpec *JobSpec) {
Expand Down
10 changes: 7 additions & 3 deletions apis/flinkcluster/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp/cmpopts"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"

Expand Down Expand Up @@ -135,7 +136,8 @@ func TestSetDefault(t *testing.T) {
ReadinessProbe: &defaultJmReadinessProbe,
},
TaskManager: &TaskManagerSpec{
Replicas: &tmReplicas,
Replicas: &tmReplicas,
DeploymentType: DeploymentTypeStatefulSet,
Ports: TaskManagerPorts{
Data: &defaultTmDataPort,
RPC: &defaultTmRPCPort,
Expand Down Expand Up @@ -290,7 +292,8 @@ func TestSetNonDefault(t *testing.T) {
ReadinessProbe: &jmReadinessProbe,
},
TaskManager: &TaskManagerSpec{
Replicas: &tmReplicas,
Replicas: &tmReplicas,
DeploymentType: DeploymentTypeDeployment,
Ports: TaskManagerPorts{
Data: &tmDataPort,
RPC: &tmRPCPort,
Expand Down Expand Up @@ -379,7 +382,8 @@ func TestSetNonDefault(t *testing.T) {
ReadinessProbe: &jmExpectedReadinessProbe,
},
TaskManager: &TaskManagerSpec{
Replicas: &tmReplicas,
Replicas: &tmReplicas,
DeploymentType: DeploymentTypeDeployment,
Ports: TaskManagerPorts{
Data: &tmDataPort,
RPC: &tmRPCPort,
Expand Down
27 changes: 26 additions & 1 deletion apis/flinkcluster/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,24 @@ type TaskManagerPorts struct {
Query *int32 `json:"query,omitempty"`
}

// K8s workload API kind for TaskManager workers
type DeploymentType string

const (
// This refers to the Kubernetes Type [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset)
// Use persistent volumes for recovery.
DeploymentTypeStatefulSet = "StatefulSet"

// This refers to the Kubernetes Type [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment)
// Faster startup, but the volumes are ephemeral
DeploymentTypeDeployment = "Deployment"
)

// TaskManagerSpec defines properties of TaskManager.
type TaskManagerSpec struct {
// _(Optional)_ Defines the replica workload's type. If not specified, the default value is `StatefulSet`.
DeploymentType DeploymentType `json:"deploymentType,omitempty"`

// The number of replicas. default: `3`
Replicas *int32 `json:"replicas,omitempty"`

Expand Down Expand Up @@ -334,6 +350,12 @@ type TaskManagerSpec struct {
// _(Optional)_ A template for persistent volume claim each requested and mounted to JobManager pod,
// This can be used to mount an external volume with a specific storageClass or larger captivity (for larger/faster state backend).
// [More info](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims)

// If deploymentType: StatefulSet is used, these templates will be added to the taskManager statefulset template,
// hence mounting persistent-pvcs to the indexed statefulset pods.
//
// If deploymentType: Deployment is used, these templates are appended to the Ephemeral Volumes in the PodSpec,
// hence mounting ephemeral-pvcs to the replicaset pods.
VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

// _(Optional)_ Init containers of the Task Manager pod.
Expand Down Expand Up @@ -664,7 +686,10 @@ type FlinkClusterComponentsStatus struct {
JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"`

// The state of TaskManager StatefulSet.
TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet"`
TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet,omitempty"`

// The state of TaskManager Deployment.
TaskManagerDeployment FlinkClusterComponentState `json:"taskManagerDeployment,omitempty"`

// The status of the job, available only when JobSpec is provided.
Job *JobStatus `json:"job,omitempty"`
Expand Down
13 changes: 13 additions & 0 deletions apis/flinkcluster/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func (v *Validator) ValidateUpdate(old *FlinkCluster, new *FlinkCluster) error {
return nil
}

err = v.validateTaskManagerUpdate(old, new)
if err != nil {
return err
}

err = v.validateJobUpdate(old, new)
if err != nil {
return err
Expand Down Expand Up @@ -225,6 +230,14 @@ func (v *Validator) checkSavepointGeneration(
"you cannot update savepointGeneration with others at the same time")
}

func (v *Validator) validateTaskManagerUpdate(old *FlinkCluster, new *FlinkCluster) error {
if old.Spec.TaskManager.DeploymentType != new.Spec.TaskManager.DeploymentType {
return fmt.Errorf(
"updating deploymentType is not allowed")
}
return nil
}

// Validate job update.
func (v *Validator) validateJobUpdate(old *FlinkCluster, new *FlinkCluster) error {
switch {
Expand Down
10 changes: 10 additions & 0 deletions apis/flinkcluster/v1beta1/flinkcluster_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,16 @@ func TestUpdateSavepointGeneration(t *testing.T) {
assert.Equal(t, err, nil)
}

func TestTaskManagerDeploymentTypeUpdate(t *testing.T) {
// cannot update deploymentType
var oldCluster = getSimpleFlinkCluster()
var newCluster = getSimpleFlinkCluster()
newCluster.Spec.TaskManager.DeploymentType = DeploymentTypeDeployment
err := validator.ValidateUpdate(&oldCluster, &newCluster)
expectedErr := "updating deploymentType is not allowed"
assert.Equal(t, err.Error(), expectedErr)
}

func TestUpdateJob(t *testing.T) {
var validator = &Validator{}
var tc = &TimeConverter{}
Expand Down
1 change: 1 addition & 0 deletions apis/flinkcluster/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3932,6 +3932,8 @@ spec:
type: string
taskManager:
properties:
deploymentType:
type: string
extraPorts:
items:
properties:
Expand Down Expand Up @@ -6325,6 +6327,16 @@ spec:
- name
- state
type: object
taskManagerDeployment:
properties:
name:
type: string
state:
type: string
required:
- name
- state
type: object
taskManagerStatefulSet:
properties:
name:
Expand All @@ -6339,7 +6351,6 @@ spec:
- configMap
- jobManagerService
- jobManagerStatefulSet
- taskManagerStatefulSet
type: object
control:
properties:
Expand Down
5 changes: 4 additions & 1 deletion controllers/flinkcluster/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,12 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
}
if desired.TmStatefulSet != nil {
log.Info("Desired state", "TaskManager StatefulSet", *desired.TmStatefulSet)
} else if desired.TmDeployment != nil {
log.Info("Desired state", "TaskManager Deployment", *desired.TmDeployment)
} else {
log.Info("Desired state", "TaskManager StatefulSet", "nil")
log.Info("Desired state", "TaskManager", "nil")
}

if desired.Job != nil {
log.Info("Desired state", "Job", *desired.Job)
} else {
Expand Down
68 changes: 63 additions & 5 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
state.JmStatefulSet = newJobManagerStatefulSet(cluster)
}

if !shouldCleanup(cluster, "TaskManagerStatefulSet") {
state.TmStatefulSet = newTaskManagerStatefulSet(cluster)
if cluster.Spec.TaskManager.DeploymentType == v1beta1.DeploymentTypeStatefulSet {
if !shouldCleanup(cluster, "TaskManagerStatefulSet") {
state.TmStatefulSet = newTaskManagerStatefulSet(cluster)
}
} else {
if !shouldCleanup(cluster, "TaskManagerDeployment") {
state.TmDeployment = newTaskManagerDeployment(cluster)
}
}

if !shouldCleanup(cluster, "TaskManagerService") {
Expand Down Expand Up @@ -406,7 +412,7 @@ func newJobManagerIngress(
return jobManagerIngress
}

func newTaskMangerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container {
func newTaskManagerContainer(flinkCluster *v1beta1.FlinkCluster) *corev1.Container {
var imageSpec = flinkCluster.Spec.Image
var taskManagerSpec = flinkCluster.Spec.TaskManager
var dataPort = corev1.ContainerPort{Name: "data", ContainerPort: *taskManagerSpec.Ports.Data}
Expand Down Expand Up @@ -498,7 +504,7 @@ func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.State
podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

mainContainer := newTaskMangerContainer(flinkCluster)
mainContainer := newTaskManagerContainer(flinkCluster)
podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster)

var pvcs []corev1.PersistentVolumeClaim
Expand Down Expand Up @@ -534,6 +540,58 @@ func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.State
}
}

func getEphemeralVolumesFromTaskManagerSpec(flinkCluster *v1beta1.FlinkCluster) []corev1.Volume {
var ephemeralVolumes []corev1.Volume
var volumeClaimsInSpec = flinkCluster.Spec.TaskManager.VolumeClaimTemplates
for _, volume := range volumeClaimsInSpec {
ephemeralVolumes = append(ephemeralVolumes, corev1.Volume{
Name: volume.ObjectMeta.Name,
// Ephemeral volume
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: volume.Spec,
},
},
},
})
}
return ephemeralVolumes
}

// Gets the desired TaskManager Deployment spec from a cluster spec.
func newTaskManagerDeployment(flinkCluster *v1beta1.FlinkCluster) *appsv1.Deployment {
var taskManagerSpec = flinkCluster.Spec.TaskManager
var taskManagerDeploymentName = getTaskManagerDeploymentName(flinkCluster.Name)
var podLabels = getComponentLabels(flinkCluster, "taskmanager")
podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
var deploymentLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

mainContainer := newTaskManagerContainer(flinkCluster)
podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster)
podSpec.Volumes = append(podSpec.Volumes, getEphemeralVolumesFromTaskManagerSpec(flinkCluster)...)

return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: flinkCluster.Namespace,
Name: taskManagerDeploymentName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: deploymentLabels,
},
Spec: appsv1.DeploymentSpec{
Replicas: taskManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: taskManagerSpec.PodAnnotations,
},
Spec: *podSpec,
},
},
}
}

// Gets the desired PodDisruptionBudget.
func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDisruptionBudget {
var jobSpec = flinkCluster.Spec.Job
Expand Down Expand Up @@ -1015,7 +1073,7 @@ func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool {
case v1beta1.CleanupActionDeleteCluster:
return true
case v1beta1.CleanupActionDeleteTaskManager:
return component == "TaskManagerStatefulSet"
return component == "TaskManagerStatefulSet" || component == "TaskManagerDeployment"
}

return false
Expand Down
Loading

0 comments on commit ee9d70a

Please sign in to comment.