Skip to content

Commit

Permalink
Add pod labels and annotations validation (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Mar 17, 2022
1 parent 688ae5b commit b036b9a
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
34 changes: 34 additions & 0 deletions apis/flinkcluster/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
)

const (
Expand Down Expand Up @@ -326,6 +329,17 @@ func (v *Validator) validateImage(imageSpec *ImageSpec) error {

func (v *Validator) validateJobManager(flinkVersion *version.Version, jmSpec *JobManagerSpec) error {
var err error
if jmSpec == nil {
return nil
}

fp := field.NewPath("spec.jobManager")
if errors := validation.ValidateAnnotations(jmSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
if errors := v1validation.ValidateLabels(jmSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}

// Replicas.
if jmSpec.Replicas == nil || *jmSpec.Replicas != 1 {
Expand Down Expand Up @@ -409,6 +423,18 @@ func (v *Validator) validateJobManager(flinkVersion *version.Version, jmSpec *Jo
}

func (v *Validator) validateTaskManager(flinkVersion *version.Version, tmSpec *TaskManagerSpec) error {
if tmSpec == nil {
return nil
}

fp := field.NewPath("spec.taskManager")
if errors := validation.ValidateAnnotations(tmSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
if errors := v1validation.ValidateLabels(tmSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}

// Replicas.
if tmSpec.Replicas == nil || *tmSpec.Replicas < 1 {
return fmt.Errorf("invalid TaskManager replicas, it must >= 1")
Expand Down Expand Up @@ -480,6 +506,14 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error {
return nil
}

fp := field.NewPath("spec.job")
if errors := validation.ValidateAnnotations(jobSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
if errors := v1validation.ValidateLabels(jobSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}

applicationMode := jobSpec.Mode != nil && *jobSpec.Mode == JobModeApplication
if !applicationMode && jobSpec.JarFile == nil && jobSpec.PyFile == nil && jobSpec.PyModule == nil {
return fmt.Errorf("job jarFile or pythonFile or pythonModule is unspecified")
Expand Down
94 changes: 94 additions & 0 deletions apis/flinkcluster/v1beta1/flinkcluster_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -1085,3 +1086,96 @@ func getSimpleFlinkCluster() FlinkCluster {
},
}
}

func TestFlinkClusterValidation(t *testing.T) {
longName := strings.Repeat("a", 254)

invalidJobManagerAnnotations := func() *FlinkCluster {
cluster := getSimpleFlinkCluster()
cluster.Spec.JobManager.PodAnnotations = map[string]string{
longName: "bar",
}
return &cluster
}
invalidJobManagerLabels := func() *FlinkCluster {
cluster := getSimpleFlinkCluster()
cluster.Spec.JobManager.PodLabels = map[string]string{
longName: "bar",
}
return &cluster
}
invalidTaskManagerAnnotations := func() *FlinkCluster {
cluster := getSimpleFlinkCluster()
cluster.Spec.TaskManager.PodAnnotations = map[string]string{
longName: "bar",
}
return &cluster
}
invalidTaskManagerLabels := func() *FlinkCluster {
cluster := getSimpleFlinkCluster()
cluster.Spec.TaskManager.PodLabels = map[string]string{
longName: "bar",
}
return &cluster
}
invalidJobAnnotations := func() *FlinkCluster {
cluster := getSimpleFlinkCluster()
cluster.Spec.Job.PodAnnotations = map[string]string{
longName: "bar",
}
return &cluster
}
invalidJobLabels := func() *FlinkCluster {
cluster := getSimpleFlinkCluster()
cluster.Spec.Job.PodLabels = map[string]string{
longName: "bar",
}
return &cluster
}

data := []struct {
testName string
run func() *FlinkCluster
expectedErr string
}{
{
"invalid jm annotations",
invalidJobManagerAnnotations,
fmt.Sprintf("spec.jobManager.podAnnotations: Invalid value: \"%s\": name part must be no more than 63 characters", longName),
},
{
"invalid jm labels",
invalidJobManagerLabels,
fmt.Sprintf("spec.jobManager.podLabels: Invalid value: \"%s\": name part must be no more than 63 characters", longName),
},
{
"invalid tm annotations",
invalidTaskManagerAnnotations,
fmt.Sprintf("spec.taskManager.podAnnotations: Invalid value: \"%s\": name part must be no more than 63 characters", longName),
},
{
"invalid tm labels",
invalidTaskManagerLabels,
fmt.Sprintf("spec.taskManager.podLabels: Invalid value: \"%s\": name part must be no more than 63 characters", longName),
},
{
"invalid job annotations",
invalidJobAnnotations,
fmt.Sprintf("spec.job.podAnnotations: Invalid value: \"%s\": name part must be no more than 63 characters", longName),
},
{
"invalid job labels",
invalidJobLabels,
fmt.Sprintf("spec.job.podLabels: Invalid value: \"%s\": name part must be no more than 63 characters", longName),
},
}

for _, tt := range data {
t.Run(tt.testName, func(t *testing.T) {
err := validator.ValidateCreate(tt.run())
if err != nil {
assert.Equal(t, err.Error(), tt.expectedErr)
}
})
}
}

0 comments on commit b036b9a

Please sign in to comment.