diff --git a/charts/spark-operator-chart/Chart.yaml b/charts/spark-operator-chart/Chart.yaml index ab6ed61b3c..415c816335 100644 --- a/charts/spark-operator-chart/Chart.yaml +++ b/charts/spark-operator-chart/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 name: spark-operator description: A Helm chart for Spark on Kubernetes operator -version: 1.1.19 -appVersion: v1beta2-1.3.3-3.1.1 +version: 1.1.20 +appVersion: v1beta2-1.3.4-3.1.1 keywords: - spark home: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator diff --git a/main.go b/main.go index b2ddbba1c0..080af47dbe 100644 --- a/main.go +++ b/main.go @@ -73,6 +73,7 @@ var ( metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.") metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.") metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.") + ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.") metricsLabels util.ArrayFlags metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets ) @@ -184,7 +185,7 @@ func main() { } applicationController := sparkapplication.NewController( - crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, batchSchedulerMgr, *enableUIService) + crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService) scheduledApplicationController := scheduledsparkapplication.NewController( crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{}) diff --git a/pkg/controller/sparkapplication/controller.go b/pkg/controller/sparkapplication/controller.go index 81c36345f1..baf8d290d5 100644 --- a/pkg/controller/sparkapplication/controller.go +++ b/pkg/controller/sparkapplication/controller.go @@ -76,6 +76,7 @@ type Controller struct { applicationLister crdlisters.SparkApplicationLister podLister v1.PodLister ingressURLFormat string + ingressClassName string batchSchedulerMgr *batchscheduler.SchedulerManager enableUIService bool } @@ -89,6 +90,7 @@ func NewController( metricsConfig *util.MetricConfig, namespace string, ingressURLFormat string, + ingressClassName string, batchSchedulerMgr *batchscheduler.SchedulerManager, enableUIService bool) *Controller { crdscheme.AddToScheme(scheme.Scheme) @@ -100,7 +102,7 @@ func NewController( }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "spark-operator"}) - return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, batchSchedulerMgr, enableUIService) + return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, ingressClassName, batchSchedulerMgr, enableUIService) } func newSparkApplicationController( @@ -111,6 +113,7 @@ func newSparkApplicationController( eventRecorder record.EventRecorder, metricsConfig *util.MetricConfig, ingressURLFormat string, + ingressClassName string, batchSchedulerMgr *batchscheduler.SchedulerManager, enableUIService bool) *Controller { queue := workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(queueTokenRefillRate), queueTokenBucketSize)}, @@ -122,6 +125,7 @@ func newSparkApplicationController( recorder: eventRecorder, queue: queue, ingressURLFormat: ingressURLFormat, + ingressClassName: ingressClassName, batchSchedulerMgr: batchSchedulerMgr, enableUIService: enableUIService, } @@ -687,7 +691,7 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be app.Spec.SparkConf["spark.ui.proxyBase"] = ingressURL.Path app.Spec.SparkConf["spark.ui.proxyRedirectUri"] = "/" } - ingress, err := createSparkUIIngress(app, *service, ingressURL, c.kubeClient) + ingress, err := createSparkUIIngress(app, *service, ingressURL, c.ingressClassName, c.kubeClient) if err != nil { glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err) } else { diff --git a/pkg/controller/sparkapplication/controller_test.go b/pkg/controller/sparkapplication/controller_test.go index 3bcb60c4fe..1dcb8e549f 100644 --- a/pkg/controller/sparkapplication/controller_test.go +++ b/pkg/controller/sparkapplication/controller_test.go @@ -68,7 +68,7 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second) controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder, - &util.MetricConfig{}, "", nil, true) + &util.MetricConfig{}, "", "", nil, true) informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer() if app != nil { @@ -1619,6 +1619,52 @@ func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) { } } +func TestIngressWithClassName(t *testing.T) { + os.Setenv(kubernetesServiceHostEnvVar, "localhost") + os.Setenv(kubernetesServicePortEnvVar, "443") + + appName := "ingressaffectssparkconfig" + + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: appName, + Namespace: "test", + }, + Spec: v1beta2.SparkApplicationSpec{ + RestartPolicy: v1beta2.RestartPolicy{ + Type: v1beta2.Never, + }, + TimeToLiveSeconds: int64ptr(1), + }, + Status: v1beta2.SparkApplicationStatus{}, + } + + ctrl, _ := newFakeController(app) + ctrl.ingressURLFormat = "{{$appNamespace}}.{{$appName}}.example.com" + ctrl.ingressClassName = "nginx" + ctrl.enableUIService = true + _, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name)) + assert.Nil(t, err) + _, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + ingresses, err := ctrl.kubeClient.NetworkingV1().Ingresses(app.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + if ingresses == nil || ingresses.Items == nil || len(ingresses.Items) != 1 { + t.Fatal("The ingress does not exist, has no items, or wrong amount of items") + } + if ingresses.Items[0].Spec.IngressClassName == nil || *ingresses.Items[0].Spec.IngressClassName != "nginx" { + t.Fatal("The ingressClassName does not exists, or wrong value is set") + } +} + func stringptr(s string) *string { return &s } diff --git a/pkg/controller/sparkapplication/sparkui.go b/pkg/controller/sparkapplication/sparkui.go index 2a0d09f9a9..c3c856aafa 100644 --- a/pkg/controller/sparkapplication/sparkui.go +++ b/pkg/controller/sparkapplication/sparkui.go @@ -75,21 +75,22 @@ type SparkService struct { // SparkIngress encapsulates information about the driver UI ingress. type SparkIngress struct { - ingressName string - ingressURL *url.URL - annotations map[string]string - ingressTLS []networkingv1.IngressTLS + ingressName string + ingressURL *url.URL + ingressClassName string + annotations map[string]string + ingressTLS []networkingv1.IngressTLS } -func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) { +func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, ingressClassName string, kubeClient clientset.Interface) (*SparkIngress, error) { if util.IngressCapabilities.Has("networking.k8s.io/v1") { - return createSparkUIIngress_v1(app, service, ingressURL, kubeClient) + return createSparkUIIngress_v1(app, service, ingressURL, ingressClassName, kubeClient) } else { return createSparkUIIngress_legacy(app, service, ingressURL, kubeClient) } } -func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) { +func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, ingressClassName string, kubeClient clientset.Interface) (*SparkIngress, error) { ingressResourceAnnotations := getIngressResourceAnnotations(app) ingressTlsHosts := getIngressTlsHosts(app) @@ -145,16 +146,21 @@ func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService if len(ingressTlsHosts) != 0 { ingress.Spec.TLS = ingressTlsHosts } + if len(ingressClassName) != 0 { + ingress.Spec.IngressClassName = &ingressClassName + } + glog.Infof("Creating an Ingress %s for the Spark UI for application %s", ingress.Name, app.Name) _, err := kubeClient.NetworkingV1().Ingresses(ingress.Namespace).Create(context.TODO(), &ingress, metav1.CreateOptions{}) if err != nil { return nil, err } return &SparkIngress{ - ingressName: ingress.Name, - ingressURL: ingressURL, - annotations: ingress.Annotations, - ingressTLS: ingressTlsHosts, + ingressName: ingress.Name, + ingressURL: ingressURL, + ingressClassName: ingressClassName, + annotations: ingress.Annotations, + ingressTLS: ingressTlsHosts, }, nil } diff --git a/pkg/controller/sparkapplication/sparkui_test.go b/pkg/controller/sparkapplication/sparkui_test.go index f6b8128a2e..acec9b7afc 100644 --- a/pkg/controller/sparkapplication/sparkui_test.go +++ b/pkg/controller/sparkapplication/sparkui_test.go @@ -330,7 +330,7 @@ func TestCreateSparkUIIngress(t *testing.T) { expectError bool } - testFn := func(test testcase, t *testing.T, ingressURLFormat string) { + testFn := func(test testcase, t *testing.T, ingressURLFormat string, ingressClassName string) { fakeClient := fake.NewSimpleClientset() sparkService, err := createSparkUIService(test.app, fakeClient) if err != nil { @@ -340,7 +340,7 @@ func TestCreateSparkUIIngress(t *testing.T) { if err != nil { t.Fatal(err) } - sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, fakeClient) + sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, ingressClassName, fakeClient) if err != nil { if test.expectError { return @@ -492,6 +492,7 @@ func TestCreateSparkUIIngress(t *testing.T) { }, }, } + testcases := []testcase{ { name: "simple ingress object", @@ -550,7 +551,7 @@ func TestCreateSparkUIIngress(t *testing.T) { } for _, test := range testcases { - testFn(test, t, "{{$appName}}.ingress.clusterName.com") + testFn(test, t, "{{$appName}}.ingress.clusterName.com", "") } testcases = []testcase{ @@ -569,7 +570,23 @@ func TestCreateSparkUIIngress(t *testing.T) { } for _, test := range testcases { - testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}") + testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}", "") + } + + testcases = []testcase{ + { + name: "simple ingress object with ingressClassName set", + app: app1, + expectedIngress: SparkIngress{ + ingressName: fmt.Sprintf("%s-ui-ingress", app1.GetName()), + ingressURL: parseURLAndAssertError(app1.GetName()+".ingress.clusterName.com", t), + ingressClassName: "nginx", + }, + expectError: false, + }, + } + for _, test := range testcases { + testFn(test, t, "{{$appName}}.ingress.clusterName.com", "nginx") } }