From 2749ad0c1d50a2cd482ea9625086ad2865b68748 Mon Sep 17 00:00:00 2001 From: Illia Pshonkin Date: Mon, 21 Mar 2022 14:48:54 +0200 Subject: [PATCH] Add ability to add annotations and labels to JobManager service (#320) --- apis/flinkcluster/v1beta1/flinkcluster_types.go | 6 ++++++ apis/flinkcluster/v1beta1/zz_generated.deepcopy.go | 14 ++++++++++++++ .../bases/flinkoperator.k8s.io_flinkclusters.yaml | 8 ++++++++ controllers/flinkcluster/flinkcluster_converter.go | 11 +++++++---- .../flinkcluster/flinkcluster_converter_test.go | 3 --- .../templates/flink-cluster-crd.yaml | 8 ++++++++ 6 files changed, 43 insertions(+), 7 deletions(-) diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types.go b/apis/flinkcluster/v1beta1/flinkcluster_types.go index ed487380..94b5f746 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types.go @@ -195,6 +195,12 @@ type JobManagerSpec struct { // Currently `VPC, External` are only available for GKE. AccessScope string `json:"accessScope,omitempty"` + // _(Optional)_ Define JobManager Service annotations for configuration. + ServiceAnnotations map[string]string `json:"ServiceAnnotations,omitempty"` + + // _(Optional)_ Define JobManager Service labels for configuration. + ServiceLabels map[string]string `json:"ServiceLabels,omitempty"` + // _(Optional)_ Provide external access to JobManager UI/API. Ingress *JobManagerIngressSpec `json:"ingress,omitempty"` diff --git a/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go b/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go index 26210fd7..37c7123b 100644 --- a/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go +++ b/apis/flinkcluster/v1beta1/zz_generated.deepcopy.go @@ -494,6 +494,20 @@ func (in *JobManagerSpec) DeepCopyInto(out *JobManagerSpec) { *out = new(int32) **out = **in } + if in.ServiceAnnotations != nil { + in, out := &in.ServiceAnnotations, &out.ServiceAnnotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.ServiceLabels != nil { + in, out := &in.ServiceLabels, &out.ServiceLabels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.Ingress != nil { in, out := &in.Ingress, &out.Ingress *out = new(JobManagerIngressSpec) diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index c91880b7..61aa5c39 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -1647,6 +1647,14 @@ spec: type: object jobManager: properties: + ServiceAnnotations: + additionalProperties: + type: string + type: object + ServiceLabels: + additionalProperties: + type: string + type: object accessScope: type: string extraPorts: diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index eaabed0f..09197130 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -272,14 +272,17 @@ func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service { var jobManagerServiceName = getJobManagerServiceName(clusterName) var podLabels = getComponentLabels(flinkCluster, "jobmanager") podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels) - var serviceLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) + var serviceLabels = mergeLabels(jobManagerSpec.ServiceLabels, getRevisionHashLabels(&flinkCluster.Status.Revision)) + var serviceAnnotations = jobManagerSpec.ServiceAnnotations + var jobManagerService = &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: clusterNamespace, Name: jobManagerServiceName, OwnerReferences: []metav1.OwnerReference{ ToOwnerReference(flinkCluster)}, - Labels: serviceLabels, + Labels: serviceLabels, + Annotations: serviceAnnotations, }, Spec: corev1.ServiceSpec{ Selector: podLabels, @@ -294,11 +297,11 @@ func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service { jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP case v1beta1.AccessScopeVPC: jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer - jobManagerService.Annotations = + jobManagerService.Annotations = mergeLabels(serviceAnnotations, map[string]string{ "networking.gke.io/load-balancer-type": "Internal", "networking.gke.io/internal-load-balancer-allow-global-access": "true", - } + }) case v1beta1.AccessScopeExternal: jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer case v1beta1.AccessScopeNodePort: diff --git a/controllers/flinkcluster/flinkcluster_converter_test.go b/controllers/flinkcluster/flinkcluster_converter_test.go index 1cb59e05..3e5d0883 100644 --- a/controllers/flinkcluster/flinkcluster_converter_test.go +++ b/controllers/flinkcluster/flinkcluster_converter_test.go @@ -485,9 +485,6 @@ func TestGetDesiredClusterState(t *testing.T) { Name: "flinkjobcluster-sample-jobmanager", Namespace: "default", Labels: map[string]string{ - "app": "flink", - "cluster": "flinkjobcluster-sample", - "component": "jobmanager", RevisionNameLabel: "flinkjobcluster-sample-85dc8f749", }, Annotations: map[string]string{ diff --git a/helm-chart/flink-operator/templates/flink-cluster-crd.yaml b/helm-chart/flink-operator/templates/flink-cluster-crd.yaml index 59170975..a4f0ee3b 100644 --- a/helm-chart/flink-operator/templates/flink-cluster-crd.yaml +++ b/helm-chart/flink-operator/templates/flink-cluster-crd.yaml @@ -1659,6 +1659,14 @@ spec: properties: accessScope: type: string + ServiceAnnotations: + additionalProperties: + type: string + type: object + ServiceLabels: + additionalProperties: + type: string + type: object extraPorts: items: properties: