Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingress-class-name controller flag #1482

Merged
merged 2 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions charts/spark-operator-chart/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: v2
name: spark-operator
description: A Helm chart for Spark on Kubernetes operator
version: 1.1.19
appVersion: v1beta2-1.3.3-3.1.1
version: 1.1.20
appVersion: v1beta2-1.3.4-3.1.1
keywords:
- spark
home: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
)
Expand Down Expand Up @@ -184,7 +185,7 @@ func main() {
}

applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, batchSchedulerMgr, *enableUIService)
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Controller struct {
applicationLister crdlisters.SparkApplicationLister
podLister v1.PodLister
ingressURLFormat string
ingressClassName string
batchSchedulerMgr *batchscheduler.SchedulerManager
enableUIService bool
}
Expand All @@ -89,6 +90,7 @@ func NewController(
metricsConfig *util.MetricConfig,
namespace string,
ingressURLFormat string,
ingressClassName string,
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool) *Controller {
crdscheme.AddToScheme(scheme.Scheme)
Expand All @@ -100,7 +102,7 @@ func NewController(
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "spark-operator"})

return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, batchSchedulerMgr, enableUIService)
return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, ingressClassName, batchSchedulerMgr, enableUIService)
}

func newSparkApplicationController(
Expand All @@ -111,6 +113,7 @@ func newSparkApplicationController(
eventRecorder record.EventRecorder,
metricsConfig *util.MetricConfig,
ingressURLFormat string,
ingressClassName string,
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool) *Controller {
queue := workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(queueTokenRefillRate), queueTokenBucketSize)},
Expand All @@ -122,6 +125,7 @@ func newSparkApplicationController(
recorder: eventRecorder,
queue: queue,
ingressURLFormat: ingressURLFormat,
ingressClassName: ingressClassName,
batchSchedulerMgr: batchSchedulerMgr,
enableUIService: enableUIService,
}
Expand Down Expand Up @@ -687,7 +691,7 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
app.Spec.SparkConf["spark.ui.proxyBase"] = ingressURL.Path
app.Spec.SparkConf["spark.ui.proxyRedirectUri"] = "/"
}
ingress, err := createSparkUIIngress(app, *service, ingressURL, c.kubeClient)
ingress, err := createSparkUIIngress(app, *service, ingressURL, c.ingressClassName, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
Expand Down
48 changes: 47 additions & 1 deletion pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", nil, true)
&util.MetricConfig{}, "", "", nil, true)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
Expand Down Expand Up @@ -1619,6 +1619,52 @@ func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) {
}
}

func TestIngressWithClassName(t *testing.T) {
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")

appName := "ingressaffectssparkconfig"

app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: "test",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: v1beta2.RestartPolicy{
Type: v1beta2.Never,
},
TimeToLiveSeconds: int64ptr(1),
},
Status: v1beta2.SparkApplicationStatus{},
}

ctrl, _ := newFakeController(app)
ctrl.ingressURLFormat = "{{$appNamespace}}.{{$appName}}.example.com"
ctrl.ingressClassName = "nginx"
ctrl.enableUIService = true
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
_, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
ingresses, err := ctrl.kubeClient.NetworkingV1().Ingresses(app.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
if ingresses == nil || ingresses.Items == nil || len(ingresses.Items) != 1 {
t.Fatal("The ingress does not exist, has no items, or wrong amount of items")
}
if ingresses.Items[0].Spec.IngressClassName == nil || *ingresses.Items[0].Spec.IngressClassName != "nginx" {
t.Fatal("The ingressClassName does not exists, or wrong value is set")
}
}

func stringptr(s string) *string {
return &s
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/controller/sparkapplication/sparkui.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,22 @@ type SparkService struct {

// SparkIngress encapsulates information about the driver UI ingress.
type SparkIngress struct {
ingressName string
ingressURL *url.URL
annotations map[string]string
ingressTLS []networkingv1.IngressTLS
ingressName string
ingressURL *url.URL
ingressClassName string
annotations map[string]string
ingressTLS []networkingv1.IngressTLS
}

func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) {
func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, ingressClassName string, kubeClient clientset.Interface) (*SparkIngress, error) {
if util.IngressCapabilities.Has("networking.k8s.io/v1") {
return createSparkUIIngress_v1(app, service, ingressURL, kubeClient)
return createSparkUIIngress_v1(app, service, ingressURL, ingressClassName, kubeClient)
} else {
return createSparkUIIngress_legacy(app, service, ingressURL, kubeClient)
}
}

func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) {
func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, ingressClassName string, kubeClient clientset.Interface) (*SparkIngress, error) {
ingressResourceAnnotations := getIngressResourceAnnotations(app)
ingressTlsHosts := getIngressTlsHosts(app)

Expand Down Expand Up @@ -145,16 +146,21 @@ func createSparkUIIngress_v1(app *v1beta2.SparkApplication, service SparkService
if len(ingressTlsHosts) != 0 {
ingress.Spec.TLS = ingressTlsHosts
}
if len(ingressClassName) != 0 {
ingress.Spec.IngressClassName = &ingressClassName
}

glog.Infof("Creating an Ingress %s for the Spark UI for application %s", ingress.Name, app.Name)
_, err := kubeClient.NetworkingV1().Ingresses(ingress.Namespace).Create(context.TODO(), &ingress, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return &SparkIngress{
ingressName: ingress.Name,
ingressURL: ingressURL,
annotations: ingress.Annotations,
ingressTLS: ingressTlsHosts,
ingressName: ingress.Name,
ingressURL: ingressURL,
ingressClassName: ingressClassName,
annotations: ingress.Annotations,
ingressTLS: ingressTlsHosts,
}, nil
}

Expand Down
25 changes: 21 additions & 4 deletions pkg/controller/sparkapplication/sparkui_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
expectError bool
}

testFn := func(test testcase, t *testing.T, ingressURLFormat string) {
testFn := func(test testcase, t *testing.T, ingressURLFormat string, ingressClassName string) {
fakeClient := fake.NewSimpleClientset()
sparkService, err := createSparkUIService(test.app, fakeClient)
if err != nil {
Expand All @@ -340,7 +340,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
if err != nil {
t.Fatal(err)
}
sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, fakeClient)
sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, ingressClassName, fakeClient)
if err != nil {
if test.expectError {
return
Expand Down Expand Up @@ -492,6 +492,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
},
},
}

testcases := []testcase{
{
name: "simple ingress object",
Expand Down Expand Up @@ -550,7 +551,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
}

for _, test := range testcases {
testFn(test, t, "{{$appName}}.ingress.clusterName.com")
testFn(test, t, "{{$appName}}.ingress.clusterName.com", "")
}

testcases = []testcase{
Expand All @@ -569,7 +570,23 @@ func TestCreateSparkUIIngress(t *testing.T) {
}

for _, test := range testcases {
testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}")
testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}", "")
}

testcases = []testcase{
{
name: "simple ingress object with ingressClassName set",
app: app1,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app1.GetName()),
ingressURL: parseURLAndAssertError(app1.GetName()+".ingress.clusterName.com", t),
ingressClassName: "nginx",
},
expectError: false,
},
}
for _, test := range testcases {
testFn(test, t, "{{$appName}}.ingress.clusterName.com", "nginx")
}
}

Expand Down