Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Headless service for TaskManagers #405

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions controllers/flinkcluster/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
} else {
log.Info("Desired state", "PodDisruptionBudget", "nil")
}
if desired.TmService != nil {
log.Info("Desired state", "TaskManager Service", *desired.TmService)
} else {
log.Info("Desired state", "TaskManager Service", "nil")
}
if desired.JmStatefulSet != nil {
log.Info("Desired state", "JobManager StatefulSet", *desired.JmStatefulSet)
} else {
Expand Down
50 changes: 50 additions & 0 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
if !shouldCleanup(cluster, "ConfigMap") {
state.ConfigMap = newConfigMap(cluster)
}

if !shouldCleanup(cluster, "PodDisruptionBudget") {
state.PodDisruptionBudget = newPodDisruptionBudget(cluster)
}

if !shouldCleanup(cluster, "JobManagerStatefulSet") && !applicationMode {
state.JmStatefulSet = newJobManagerStatefulSet(cluster)
}
Expand All @@ -100,6 +102,10 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
state.TmStatefulSet = newTaskManagerStatefulSet(cluster)
}

if !shouldCleanup(cluster, "TaskManagerService") {
state.TmService = newTaskManagerService(cluster)
}

if !shouldCleanup(cluster, "JobManagerService") {
state.JmService = newJobManagerService(cluster)
}
Expand Down Expand Up @@ -557,6 +563,50 @@ func newPodDisruptionBudget(flinkCluster *v1beta1.FlinkCluster) *policyv1.PodDis
}
}

// Gets the desired TaskManager Headless Service.
func newTaskManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
var tmSpec = flinkCluster.Spec.TaskManager
if tmSpec == nil {
return nil
}
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
// Service name matches the service name defined in the TM StatefulSet spec
var tmSvcName = getTaskManagerStatefulSetName(clusterName)
var labels = getClusterLabels(flinkCluster)
var tmSelector = getComponentLabels(flinkCluster, "taskmanager")

var tmSvcPorts = []corev1.ServicePort{
{
Name: "data",
Port: *tmSpec.Ports.Data,
},
{
Name: "rpc",
Port: *tmSpec.Ports.RPC,
},
{
Name: "query",
Port: *tmSpec.Ports.Query,
},
}

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: tmSvcName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: tmSelector,
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
Ports: tmSvcPorts,
},
}
}

// Gets the desired configMap.
func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion)
Expand Down
30 changes: 30 additions & 0 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type ObservedClusterState struct {
jmService *corev1.Service
jmIngress *networkingv1.Ingress
tmStatefulSet *appsv1.StatefulSet
tmService *corev1.Service
podDisruptionBudget *policyv1.PodDisruptionBudget
persistentVolumeClaims *corev1.PersistentVolumeClaimList
flinkJob FlinkJob
Expand Down Expand Up @@ -252,6 +253,21 @@ func (observer *ClusterStateObserver) observe(
observed.tmStatefulSet = observedTmStatefulSet
}

// TaskManager Service.
var observedTmSvc = new(corev1.Service)
err = observer.observeTaskManagerService(observedTmSvc)
if err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to get TaskManager Service")
return err
}
log.Info("Observed TaskManager Service", "state", "nil")
observedTmSvc = nil
} else {
log.Info("Observed TaskManager Service", "state", *observedTmSvc)
observed.tmService = observedTmSvc
}

// (Optional) Savepoint.
var observedSavepoint Savepoint
err = observer.observeSavepoint(observed.cluster, &observedSavepoint)
Expand Down Expand Up @@ -494,6 +510,20 @@ func (observer *ClusterStateObserver) observeTaskManagerStatefulSet(
clusterNamespace, tmStatefulSetName, "TaskManager", observedStatefulSet)
}

func (observer *ClusterStateObserver) observeTaskManagerService(
observedSvc *corev1.Service) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name

return observer.k8sClient.Get(
observer.context,
types.NamespacedName{
Namespace: clusterNamespace,
Name: getTaskManagerStatefulSetName(clusterName),
},
observedSvc)
}

func (observer *ClusterStateObserver) observeStatefulSet(
namespace string,
name string,
Expand Down
52 changes: 52 additions & 0 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
if err != nil {
return ctrl.Result{}, err
}
err = reconciler.reconcileTaskManagerService()
if err != nil {
return ctrl.Result{}, err
}

err = reconciler.reconcilePersistentVolumeClaims()
if err != nil {
Expand Down Expand Up @@ -201,6 +205,54 @@ func (reconciler *ClusterReconciler) reconcileStatefulSet(
return nil
}

func (reconciler *ClusterReconciler) reconcileTaskManagerService() error {
var desiredSvc = reconciler.desired.TmService
var observedSvc = reconciler.observed.tmService

if desiredSvc != nil && observedSvc == nil {
return reconciler.createTaskManagerService(desiredSvc, "TaskManager Service")
}

if desiredSvc == nil && observedSvc != nil {
return reconciler.deleteTaskManagerService(observedSvc, "TaskManager Service")
}
return nil

}

func (reconciler *ClusterReconciler) createTaskManagerService(
svc *corev1.Service, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient

log.Info("Creating TaskManager Service", "TaskManager Service", *svc)
var err = k8sClient.Create(context, svc)
if err != nil {
log.Info("Failed to create TaskManager Service", "error", err)
} else {
log.Info("TaskManager Service created")
}
return err
}

func (reconciler *ClusterReconciler) deleteTaskManagerService(
svc *corev1.Service, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient

log.Info("Deleting TaskManager Service", "TaskManager Service", svc)
var err = k8sClient.Delete(context, svc)
err = client.IgnoreNotFound(err)
if err != nil {
log.Error(err, "Failed to delete TaskManager Service")
} else {
log.Info("TaskManager Service deleted")
}
return err
}

func (reconciler *ClusterReconciler) createStatefulSet(
statefulSet *appsv1.StatefulSet, component string) error {
var context = reconciler.context
Expand Down
6 changes: 4 additions & 2 deletions controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ func isUpdatedAll(observed ObservedClusterState) bool {
components := []runtime.Object{
observed.configMap,
observed.podDisruptionBudget,
observed.jmStatefulSet,
observed.tmStatefulSet,
observed.tmService,
observed.jmStatefulSet,
observed.jmService,
observed.jmIngress,
observed.flinkJobSubmitter.job,
Expand All @@ -401,8 +402,9 @@ func isClusterUpdateToDate(observed *ObservedClusterState) bool {
components := []runtime.Object{
observed.configMap,
observed.podDisruptionBudget,
observed.jmStatefulSet,
observed.tmStatefulSet,
observed.tmService,
observed.jmStatefulSet,
observed.jmService,
}
return areComponentsUpdated(components, observed.cluster)
Expand Down
2 changes: 2 additions & 0 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func TestGetUpdateState(t *testing.T) {
podDisruptionBudget: &policyv1.PodDisruptionBudget{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
tmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
}
var state = getUpdateState(&observed)
Expand Down Expand Up @@ -305,6 +306,7 @@ func TestGetUpdateState(t *testing.T) {
jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
tmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
jmIngress: &networkingv1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}},
}
state = getUpdateState(&observed)
Expand Down
1 change: 1 addition & 0 deletions internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type DesiredClusterState struct {
JmService *corev1.Service
JmIngress *networkingv1.Ingress
TmStatefulSet *appsv1.StatefulSet
TmService *corev1.Service
ConfigMap *corev1.ConfigMap
Job *batchv1.Job
PodDisruptionBudget *policyv1.PodDisruptionBudget
Expand Down