Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve inherited annotations #2657

Merged
merged 11 commits into from
Jun 26, 2024
33 changes: 19 additions & 14 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,22 +909,8 @@ def test_ignored_annotations(self):
'''
k8s = self.k8s

annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}

try:
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)

patch_config_ignored_annotations = {
"data": {
"ignored_annotations": "k8s-status",
Expand All @@ -933,6 +919,25 @@ def test_ignored_annotations(self):
k8s.update_config(patch_config_ignored_annotations)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')

annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}

old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)

k8s.delete_operator_pod()
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
new_sts_creation_timestamp = sts.metadata.creation_timestamp
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
Expand Down
1 change: 1 addition & 0 deletions manifests/operator-service-account-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rules:
- delete
- get
- update
- patch
# to check nodes for node readiness label
- apiGroups:
- ""
Expand Down
91 changes: 41 additions & 50 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, "new statefulset's pod management policy do not match")
}

if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
}
}
if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
match = false
needsReplace = true
Expand Down Expand Up @@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
Expand All @@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason))
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
Expand Down Expand Up @@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
}
}

if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed {
return !changed, "new service's annotations does not match the current one:" + reason
}

return true, ""
}

Expand All @@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newImage, curImage)
}

newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
}

newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
Expand All @@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
return false, "new PDB's annotations does not match the current one:" + reason
}
return true, ""
}

func getPgVersion(cronJob *batchv1.CronJob) string {
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
for _, env := range envs {
Expand Down Expand Up @@ -883,7 +902,6 @@ func (c *Cluster) hasFinalizer() bool {
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed := false
userInitFailed := false
syncStatefulSet := false

c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -914,20 +932,16 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
syncStatefulSet = true
} else {
c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
// sticking with old version, this will also advance GetDesiredVersion next time.
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}

// Service
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
Expand All @@ -946,15 +960,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0

if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)

initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
c.logger.Debugf("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
userInitFailed = true
updateFailed = true
return
}

}
if initUsers || annotationsChanged {
c.logger.Debugf("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
Expand All @@ -968,38 +986,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if c.OpConfig.StorageResizeMode != "off" {
c.syncVolumes()
} else {
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
}

// streams configuration
if len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 {
syncStatefulSet = true
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
}

// Statefulset
func() {
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate old statefulset spec: %v", err)
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
return
}

newSs, err := c.generateStatefulSet(&newSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate new statefulset spec: %v", err)
updateFailed = true
return
}

if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
c.logger.Debugf("syncing statefulsets")
syncStatefulSet = false
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
}
}
}()

Expand All @@ -1011,12 +1005,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}

// logical backup job
Expand Down
Loading
Loading