Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: RestartPolicy serde results in different revision #497

Merged
merged 2 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apis/flinkcluster/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ type JobSpec struct {
// the job will stay in failed state. This option is usually used together
// with `autoSavepointSeconds` and `savepointsDir`.
// +kubebuilder:default:=Never
RestartPolicy *JobRestartPolicy `json:"restartPolicy,omitempty"`
// TODO: this field should be omitempty but since it affects job revision we need to defer it to v1beta2
RestartPolicy *JobRestartPolicy `json:"restartPolicy"`

// The action to take after job finishes.
// +kubebuilder:default:={afterJobSucceeds:DeleteCluster, afterJobFails:KeepCluster, afterJobCancelled:DeleteCluster}
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,8 @@ spec:
- name
type: object
type: array
required:
- restartPolicy
type: object
jobManager:
default:
Expand Down
13 changes: 10 additions & 3 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestNewRevision(t *testing.T) {
var jarFile = "gs://my-bucket/myjob.jar"
var parallelism int32 = 2
var savepointDir = "/savepoint_dir"
restartPolicy := v1beta1.JobRestartPolicyNever
var flinkCluster = v1beta1.FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Expand Down Expand Up @@ -107,20 +108,26 @@ func TestNewRevision(t *testing.T) {
JarFile: &jarFile,
Parallelism: &parallelism,
SavepointsDir: &savepointDir,
RestartPolicy: &restartPolicy,
},
},
}

var patched = flinkCluster.DeepCopy()
// RestartPolicy is not part of the revision.
patched.Spec.Job.RestartPolicy = nil

var collisionCount int32 = 0
var controller = true
var blockOwnerDeletion = true
var raw, _ = getPatch(&flinkCluster)
var raw, _ = getPatch(patched)
var revision, _ = newRevision(&flinkCluster, 1, &collisionCount)
var expectedRevision = appsv1.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster-fb7687bf5",
Name: "mycluster-7bc87c954f",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New revisions trigger cluster updates! so this value change should be treated carefully. In this case this is changed to previous value.

Namespace: "default",
Labels: map[string]string{
"flinkoperator.k8s.io/hash": "fb7687bf5",
"flinkoperator.k8s.io/hash": "7bc87c954f",
"flinkoperator.k8s.io/managed-by": "mycluster",
},
Annotations: map[string]string{},
Expand Down