Skip to content

Commit

Permalink
Add Headless service for TaskManagers (#405)
Browse files Browse the repository at this point in the history
Co-authored-by: Barna Kutassy <[email protected]>
  • Loading branch information
Barnabas Kutassy and Barna Kutassy authored May 25, 2022
1 parent 29c37b5 commit a0eb731
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 2 deletions.
5 changes: 5 additions & 0 deletions controllers/flinkcluster/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
} else {
log.Info("Desired state", "PodDisruptionBudget", "nil")
}
if desired.TmService != nil {
log.Info("Desired state", "TaskManager Service", *desired.TmService)
} else {
log.Info("Desired state", "TaskManager Service", "nil")
}
if desired.JmStatefulSet != nil {
log.Info("Desired state", "JobManager StatefulSet", *desired.JmStatefulSet)
} else {
Expand Down
50 changes: 50 additions & 0 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
if !shouldCleanup(cluster, "ConfigMap") {
state.ConfigMap = newConfigMap(cluster)
}

if !shouldCleanup(cluster, "PodDisruptionBudget") {
state.PodDisruptionBudget = newPodDisruptionBudget(cluster)
}

if !shouldCleanup(cluster, "JobManagerStatefulSet") && !applicationMode {
state.JmStatefulSet = newJobManagerStatefulSet(cluster)
}
Expand All @@ -100,6 +102,10 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
state.TmStatefulSet = newTaskManagerStatefulSet(cluster)
}

if !shouldCleanup(cluster, "TaskManagerService") {
state.TmService = newTaskManagerService(cluster)
}

if !shouldCleanup(cluster, "JobManagerService") {
state.JmService = newJobManagerService(cluster)
}
Expand Down Expand Up @@ -557,6 +563,50 @@ func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDis
}
}

// Gets the desired TaskManager Headless Service.
func newTaskManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
var tmSpec = flinkCluster.Spec.TaskManager
if tmSpec == nil {
return nil
}
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
// Service name matches the service name defined in the TM StatefulSet spec
var tmSvcName = getTaskManagerStatefulSetName(clusterName)
var labels = getClusterLabels(flinkCluster)
var tmSelector = getComponentLabels(flinkCluster, "taskmanager")

var tmSvcPorts = []corev1.ServicePort{
{
Name: "data",
Port: *tmSpec.Ports.Data,
},
{
Name: "rpc",
Port: *tmSpec.Ports.RPC,
},
{
Name: "query",
Port: *tmSpec.Ports.Query,
},
}

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: tmSvcName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: tmSelector,
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
Ports: tmSvcPorts,
},
}
}

// Gets the desired configMap.
func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion)
Expand Down
30 changes: 30 additions & 0 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type ObservedClusterState struct {
jmService *corev1.Service
jmIngress *networkingv1.Ingress
tmStatefulSet *appsv1.StatefulSet
tmService *corev1.Service
podDisruptionBudget *policyv1.PodDisruptionBudget
persistentVolumeClaims *corev1.PersistentVolumeClaimList
flinkJob FlinkJob
Expand Down Expand Up @@ -252,6 +253,21 @@ func (observer *ClusterStateObserver) observe(
observed.tmStatefulSet = observedTmStatefulSet
}

// TaskManager Service.
var observedTmSvc = new(corev1.Service)
err = observer.observeTaskManagerService(observedTmSvc)
if err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get TaskManager Service")
return err
}
log.Info("Observed TaskManager Service", "state", "nil")
observedTmSvc = nil
} else {
log.Info("Observed TaskManager Service", "state", *observedTmSvc)
observed.tmService = observedTmSvc
}

// (Optional) Savepoint.
var observedSavepoint Savepoint
err = observer.observeSavepoint(observed.cluster, &observedSavepoint)
Expand Down Expand Up @@ -494,6 +510,20 @@ func (observer *ClusterStateObserver) observeTaskManagerStatefulSet(
clusterNamespace, tmStatefulSetName, "TaskManager", observedStatefulSet)
}

func (observer *ClusterStateObserver) observeTaskManagerService(
observedSvc *corev1.Service) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name

return observer.k8sClient.Get(
observer.context,
types.NamespacedName{
Namespace: clusterNamespace,
Name: getTaskManagerStatefulSetName(clusterName),
},
observedSvc)
}

func (observer *ClusterStateObserver) observeStatefulSet(
namespace string,
name string,
Expand Down
52 changes: 52 additions & 0 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
if err != nil {
return ctrl.Result{}, err
}
err = reconciler.reconcileTaskManagerService()
if err != nil {
return ctrl.Result{}, err
}

err = reconciler.reconcilePersistentVolumeClaims()
if err != nil {
Expand Down Expand Up @@ -201,6 +205,54 @@ func (reconciler *ClusterReconciler) reconcileStatefulSet(
return nil
}

func (reconciler *ClusterReconciler) reconcileTaskManagerService() error {
var desiredSvc = reconciler.desired.TmService
var observedSvc = reconciler.observed.tmService

if desiredSvc != nil && observedSvc == nil {
return reconciler.createTaskManagerService(desiredSvc, "TaskManager Service")
}

if desiredSvc == nil && observedSvc != nil {
return reconciler.deleteTaskManagerService(observedSvc, "TaskManager Service")
}
return nil

}

func (reconciler *ClusterReconciler) createTaskManagerService(
svc *corev1.Service, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient

log.Info("Creating TaskManager Service", "TaskManager Service", *svc)
var err = k8sClient.Create(context, svc)
if err != nil {
log.Info("Failed to create TaskManager Service", "error", err)
} else {
log.Info("TaskManager Service created")
}
return err
}

func (reconciler *ClusterReconciler) deleteTaskManagerService(
svc *corev1.Service, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient

log.Info("Deleting TaskManager Service", "TaskManager Service", svc)
var err = k8sClient.Delete(context, svc)
err = client.IgnoreNotFound(err)
if err != nil {
log.Error(err, "Failed to delete TaskManager Service")
} else {
log.Info("TaskManager Service deleted")
}
return err
}

func (reconciler *ClusterReconciler) createStatefulSet(
statefulSet *appsv1.StatefulSet, component string) error {
var context = reconciler.context
Expand Down
6 changes: 4 additions & 2 deletions controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ func isUpdatedAll(observed ObservedClusterState) bool {
components := []runtime.Object{
observed.configMap,
observed.podDisruptionBudget,
observed.jmStatefulSet,
observed.tmStatefulSet,
observed.tmService,
observed.jmStatefulSet,
observed.jmService,
observed.jmIngress,
observed.flinkJobSubmitter.job,
Expand All @@ -401,8 +402,9 @@ func isClusterUpdateToDate(observed *ObservedClusterState) bool {
components := []runtime.Object{
observed.configMap,
observed.podDisruptionBudget,
observed.jmStatefulSet,
observed.tmStatefulSet,
observed.tmService,
observed.jmStatefulSet,
observed.jmService,
}
return areComponentsUpdated(components, observed.cluster)
Expand Down
2 changes: 2 additions & 0 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func TestGetUpdateState(t *testing.T) {
podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
tmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
}
var state = getUpdateState(&observed)
Expand Down Expand Up @@ -305,6 +306,7 @@ func TestGetUpdateState(t *testing.T) {
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
tmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
}
state = getUpdateState(&observed)
Expand Down
1 change: 1 addition & 0 deletions internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type DesiredClusterState struct {
JmService *corev1.Service
JmIngress *networkingv1.Ingress
TmStatefulSet *appsv1.StatefulSet
TmService *corev1.Service
ConfigMap *corev1.ConfigMap
Job *batchv1.Job
PodDisruptionBudget *policyv1.PodDisruptionBudget
Expand Down

0 comments on commit a0eb731

Please sign in to comment.