diff --git a/controllers/flinkcluster/flinkcluster_controller.go b/controllers/flinkcluster/flinkcluster_controller.go index 717d15ac..5de29f9e 100644 --- a/controllers/flinkcluster/flinkcluster_controller.go +++ b/controllers/flinkcluster/flinkcluster_controller.go @@ -191,6 +191,11 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context, } else { log.Info("Desired state", "PodDisruptionBudget", "nil") } + if desired.TmService != nil { + log.Info("Desired state", "TaskManager Service", *desired.TmService) + } else { + log.Info("Desired state", "TaskManager Service", "nil") + } if desired.JmStatefulSet != nil { log.Info("Desired state", "JobManager StatefulSet", *desired.JmStatefulSet) } else { diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index f4794e2f..56da2c4d 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -89,9 +89,11 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste if !shouldCleanup(cluster, "ConfigMap") { state.ConfigMap = newConfigMap(cluster) } + if !shouldCleanup(cluster, "PodDisruptionBudget") { state.PodDisruptionBudget = newPodDisruptionBudget(cluster) } + if !shouldCleanup(cluster, "JobManagerStatefulSet") && !applicationMode { state.JmStatefulSet = newJobManagerStatefulSet(cluster) } @@ -100,6 +102,10 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste state.TmStatefulSet = newTaskManagerStatefulSet(cluster) } + if !shouldCleanup(cluster, "TaskManagerService") { + state.TmService = newTaskManagerService(cluster) + } + if !shouldCleanup(cluster, "JobManagerService") { state.JmService = newJobManagerService(cluster) } @@ -557,6 +563,50 @@ func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDis } } +// Gets the desired TaskManager Headless Service. +func newTaskManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service { + var tmSpec = flinkCluster.Spec.TaskManager + if tmSpec == nil { + return nil + } + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name + // Service name matches the service name defined in the TM StatefulSet spec + var tmSvcName = getTaskManagerStatefulSetName(clusterName) + var labels = getClusterLabels(flinkCluster) + var tmSelector = getComponentLabels(flinkCluster, "taskmanager") + + var tmSvcPorts = []corev1.ServicePort{ + { + Name: "data", + Port: *tmSpec.Ports.Data, + }, + { + Name: "rpc", + Port: *tmSpec.Ports.RPC, + }, + { + Name: "query", + Port: *tmSpec.Ports.Query, + }, + } + + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: clusterNamespace, + Name: tmSvcName, + OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)}, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + Selector: tmSelector, + ClusterIP: corev1.ClusterIPNone, + Type: corev1.ServiceTypeClusterIP, + Ports: tmSvcPorts, + }, + } +} + // Gets the desired configMap. func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap { appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion) diff --git a/controllers/flinkcluster/flinkcluster_observer.go b/controllers/flinkcluster/flinkcluster_observer.go index 4b670567..ba736e88 100644 --- a/controllers/flinkcluster/flinkcluster_observer.go +++ b/controllers/flinkcluster/flinkcluster_observer.go @@ -59,6 +59,7 @@ type ObservedClusterState struct { jmService *corev1.Service jmIngress *networkingv1.Ingress tmStatefulSet *appsv1.StatefulSet + tmService *corev1.Service podDisruptionBudget *policyv1.PodDisruptionBudget persistentVolumeClaims *corev1.PersistentVolumeClaimList flinkJob FlinkJob @@ -252,6 +253,21 @@ func (observer *ClusterStateObserver) observe( observed.tmStatefulSet = observedTmStatefulSet } + // TaskManager Service. + var observedTmSvc = new(corev1.Service) + err = observer.observeTaskManagerService(observedTmSvc) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get TaskManager Service") + return err + } + log.Info("Observed TaskManager Service", "state", "nil") + observedTmSvc = nil + } else { + log.Info("Observed TaskManager Service", "state", *observedTmSvc) + observed.tmService = observedTmSvc + } + // (Optional) Savepoint. var observedSavepoint Savepoint err = observer.observeSavepoint(observed.cluster, &observedSavepoint) @@ -494,6 +510,20 @@ func (observer *ClusterStateObserver) observeTaskManagerStatefulSet( clusterNamespace, tmStatefulSetName, "TaskManager", observedStatefulSet) } +func (observer *ClusterStateObserver) observeTaskManagerService( + observedSvc *corev1.Service) error { + var clusterNamespace = observer.request.Namespace + var clusterName = observer.request.Name + + return observer.k8sClient.Get( + observer.context, + types.NamespacedName{ + Namespace: clusterNamespace, + Name: getTaskManagerStatefulSetName(clusterName), + }, + observedSvc) +} + func (observer *ClusterStateObserver) observeStatefulSet( namespace string, name string, diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 74c46e74..bb56985b 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -110,6 +110,10 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { if err != nil { return ctrl.Result{}, err } + err = reconciler.reconcileTaskManagerService() + if err != nil { + return ctrl.Result{}, err + } err = reconciler.reconcilePersistentVolumeClaims() if err != nil { @@ -201,6 +205,54 @@ func (reconciler *ClusterReconciler) reconcileStatefulSet( return nil } +func (reconciler *ClusterReconciler) reconcileTaskManagerService() error { + var desiredSvc = reconciler.desired.TmService + var observedSvc = reconciler.observed.tmService + + if desiredSvc != nil && observedSvc == nil { + return reconciler.createTaskManagerService(desiredSvc, "TaskManager Service") + } + + if desiredSvc == nil && observedSvc != nil { + return reconciler.deleteTaskManagerService(observedSvc, "TaskManager Service") + } + return nil + +} + +func (reconciler *ClusterReconciler) createTaskManagerService( + svc *corev1.Service, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Creating TaskManager Service", "TaskManager Service", *svc) + var err = k8sClient.Create(context, svc) + if err != nil { + log.Info("Failed to create TaskManager Service", "error", err) + } else { + log.Info("TaskManager Service created") + } + return err +} + +func (reconciler *ClusterReconciler) deleteTaskManagerService( + svc *corev1.Service, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Deleting TaskManager Service", "TaskManager Service", svc) + var err = k8sClient.Delete(context, svc) + err = client.IgnoreNotFound(err) + if err != nil { + log.Error(err, "Failed to delete TaskManager Service") + } else { + log.Info("TaskManager Service deleted") + } + return err +} + func (reconciler *ClusterReconciler) createStatefulSet( statefulSet *appsv1.StatefulSet, component string) error { var context = reconciler.context diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index d9bc1aab..61ed9824 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -384,8 +384,9 @@ func isUpdatedAll(observed ObservedClusterState) bool { components := []runtime.Object{ observed.configMap, observed.podDisruptionBudget, - observed.jmStatefulSet, observed.tmStatefulSet, + observed.tmService, + observed.jmStatefulSet, observed.jmService, observed.jmIngress, observed.flinkJobSubmitter.job, @@ -401,8 +402,9 @@ func isClusterUpdateToDate(observed *ObservedClusterState) bool { components := []runtime.Object{ observed.configMap, observed.podDisruptionBudget, - observed.jmStatefulSet, observed.tmStatefulSet, + observed.tmService, + observed.jmStatefulSet, observed.jmService, } return areComponentsUpdated(components, observed.cluster) diff --git a/controllers/flinkcluster/flinkcluster_util_test.go b/controllers/flinkcluster/flinkcluster_util_test.go index 32da9771..9942d9b4 100644 --- a/controllers/flinkcluster/flinkcluster_util_test.go +++ b/controllers/flinkcluster/flinkcluster_util_test.go @@ -269,6 +269,7 @@ func TestGetUpdateState(t *testing.T) { podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + tmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } var state = getUpdateState(&observed) @@ -305,6 +306,7 @@ func TestGetUpdateState(t *testing.T) { jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + tmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, } state = getUpdateState(&observed) diff --git a/internal/model/model.go b/internal/model/model.go index c7c165e0..4745e7e9 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -27,6 +27,7 @@ type DesiredClusterState struct { JmService *corev1.Service JmIngress *networkingv1.Ingress TmStatefulSet *appsv1.StatefulSet + TmService *corev1.Service ConfigMap *corev1.ConfigMap Job *batchv1.Job PodDisruptionBudget *policyv1.PodDisruptionBudget