Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve inherited annotations #2657

Merged
merged 11 commits into from
Jun 26, 2024
28 changes: 19 additions & 9 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,25 +909,35 @@ def test_ignored_annotations(self):
'''
k8s = self.k8s

annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}

try:
patch_config_ignored_annotations = {
"data": {
"ignored_annotations": "k8s-status",
}
}
k8s.update_config(patch_config_ignored_annotations)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')

annotation_patch = {
"metadata": {
"annotations": {
"k8s-status": "healthy"
},
}
}

old_sts_creation_timestamp = sts.metadata.creation_timestamp
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
old_svc_creation_timestamp = svc.metadata.creation_timestamp
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)

patch_config_ignored_annotations = {
"data": {
"ignored_annotations": "k8s-status",
"ignored_annotations": "k8s-status, foo",
}
}
k8s.update_config(patch_config_ignored_annotations)
Expand Down
1 change: 1 addition & 0 deletions manifests/operator-service-account-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rules:
- delete
- get
- update
- patch
# to check nodes for node readiness label
- apiGroups:
- ""
Expand Down
87 changes: 43 additions & 44 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -400,7 +401,7 @@ func (c *Cluster) Create() (err error) {

if len(c.Spec.Streams) > 0 {
// creating streams requires syncing the statefulset first
err = c.syncStatefulSet()
err = c.syncStatefulSet(true)
if err != nil {
return fmt.Errorf("could not sync statefulset: %v", err)
}
Expand Down Expand Up @@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, "new statefulset's pod management policy do not match")
}

if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
}
}
if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
match = false
needsReplace = true
Expand Down Expand Up @@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
match = false
needsReplace = true
needsRollUpdate = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
}
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
Expand All @@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason))
}
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
Expand Down Expand Up @@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
}
}

if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed {
return !changed, "new service's annotations does not match the current one:" + reason
}

return true, ""
}

Expand All @@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newImage, curImage)
}

newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
}

newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
Expand All @@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
return false, "new PDB's annotations does not match the current one:" + reason
}
return true, ""
}

func getPgVersion(cronJob *batchv1.CronJob) string {
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
for _, env := range envs {
Expand Down Expand Up @@ -922,12 +941,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// Service
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}
if err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
Expand All @@ -946,15 +962,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0

if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)

initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
c.logger.Debugf("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
userInitFailed = true
updateFailed = true
return
}

}
if initUsers || annotationsChanged {
c.logger.Debugf("syncing secrets")
//TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
Expand All @@ -968,7 +988,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if c.OpConfig.StorageResizeMode != "off" {
c.syncVolumes()
} else {
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
}

// streams configuration
Expand All @@ -978,29 +998,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {

// Statefulset
func() {
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate old statefulset spec: %v", err)
updateFailed = true
return
}

newSs, err := c.generateStatefulSet(&newSpec.Spec)
if err != nil {
c.logger.Errorf("could not generate new statefulset spec: %v", err)
if err := c.syncStatefulSet(syncStatefulSet); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
return
}

if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
c.logger.Debugf("syncing statefulsets")
syncStatefulSet = false
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
if err := c.syncStatefulSet(); err != nil {
c.logger.Errorf("could not sync statefulsets: %v", err)
updateFailed = true
}
}
syncStatefulSet = false
}()

// add or remove standby_cluster section from Patroni config depending on changes in standby section
Expand All @@ -1011,12 +1013,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// pod disruption budget
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
c.logger.Debug("syncing pod disruption budgets")
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}
if err := c.syncPodDisruptionBudget(true); err != nil {
c.logger.Errorf("could not sync pod disruption budget: %v", err)
updateFailed = true
}

// logical backup job
Expand Down
Loading
Loading